[prev in list] [next in list] [prev in thread] [next in thread] 

List:       rhq-commits
Subject:    [rhq] Branch 'jsanda/aggregation' - 3 commits - modules/enterprise
From:       John Sanda <jsanda () fedoraproject ! org>
Date:       2013-12-19 18:44:29
Message-ID: 20131219184429.D2FC360A3B () fedorahosted ! org
[Download RAW message or body]

 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java \
|  109 --  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java \
|   66 -  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java \
|   62 -  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java \
|  116 ---  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java \
|  255 ------  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java \
|  331 --------  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java \
|  104 --  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java \
|   91 --  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java \
|   97 --  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
|    4   modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java \
|  127 +++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java \
|   84 ++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java \
|   73 +  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java \
|  133 +++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java \
|  257 ++++++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java \
|  378 ++++++++++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java \
|  113 ++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java \
|   99 ++  modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java \
|  106 ++  modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java \
|    1   20 files changed, 1373 insertions(+), 1233 deletions(-)

New commits:
commit 1339dad67f4d008561bbb99b7fd5bec8bc90eca3
Author: John Sanda <jsanda@redhat.com>
Date:   Thu Dec 19 13:41:10 2013 -0500

    [BZ 1009945] turn on async aggregation by default

diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
 index 7f5b639..d319146 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
                
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
 @@ -92,7 +92,7 @@ public class MetricsServer {
 
     private int aggregationBatchSize;
 
-    private boolean useAsyncAggregation = \
System.getProperty("rhq.metrics.aggregation.async") != null; +    private boolean \
useAsyncAggregation = \
Boolean.valueOf(System.getProperty("rhq.metrics.aggregation.async", "true"));  
     public void setDAO(MetricsDAO dao) {
         this.dao = dao;


commit 0852a2f987de1af4c409067da2a13b6064a640bc
Author: John Sanda <jsanda@redhat.com>
Date:   Thu Dec 19 11:11:50 2013 -0500

    [BZ 1009945] updating test with package refactoring

diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java \
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
 index 00803db..7d66cef 100644
--- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
                
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
 @@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+import org.rhq.server.metrics.aggregation.Aggregator;
 import org.rhq.server.metrics.domain.AggregateNumericMetric;
 import org.rhq.server.metrics.domain.AggregateType;
 import org.rhq.server.metrics.domain.MetricsIndexEntry;


commit 4da0ad97bbcb2e36c70d47a96e8345825bb72615
Author: John Sanda <jsanda@redhat.com>
Date:   Thu Dec 19 11:05:14 2013 -0500

    [BZ 1009945] cleaning up logging and adding some javadocs
    
    Also moving all aggregation related classes into the \
org.rhq.server.metrics.aggregation  package. All classes except Aggregator now have \
package-level access.

diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
 deleted file mode 100644
index 4a5e78b..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
                
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted \
                for the batch, aggregation of 6 hour
- * data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute6HourData
- * @author John Sanda
- */
-public class Aggregate1HourData implements Runnable {
-
-    private final Log log = LogFactory.getLog(Aggregate1HourData.class);
-
-    private MetricsDAO dao;
-
-    private AggregationState state;
-
-    private Set<Integer> scheduleIds;
-
-    private List<StorageResultSetFuture> queryFutures;
-
-    public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
                scheduleIds,
-        List<StorageResultSetFuture> queryFutures) {
-        this.dao = dao;
-        this.state = state;
-        this.scheduleIds = scheduleIds;
-        this.queryFutures = queryFutures;
-    }
-
-    @Override
-    public void run() {
-        final long start = System.currentTimeMillis();
-        ListenableFuture<List<ResultSet>> queriesFuture = \
                Futures.successfulAsList(queryFutures);
-        Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
-            @Override
-            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
                Exception {
-                log.error("An error occurred while fetching one hour data", t);
-                return Futures.immediateFailedFuture(t);
-            }
-        });
-        ListenableFuture<List<ResultSet>> computeFutures = \
                Futures.transform(queriesFuture,
-            state.getCompute6HourData(), state.getAggregationTasks());
-        Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
-            @Override
-            public void onSuccess(List<ResultSet> result) {
-                log.debug("Finished aggregating 1 hour data for " + result.size() + \
                " schedules in " +
-                    (System.currentTimeMillis() - start) + " ms");
-                start6HourDataAggregationIfNecessary();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to aggregate 1 hour data", t);
-                start6HourDataAggregationIfNecessary();
-            }
-        });
-    }
-
-    private void start6HourDataAggregationIfNecessary() {
-        try {
-            if (state.is24HourTimeSliceFinished()) {
-                update6HourIndexEntries();
-                List<StorageResultSetFuture> queryFutures = new \
                ArrayList<StorageResultSetFuture>(scheduleIds.size());
-                for (Integer scheduleId : scheduleIds) {
-                    queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
                state.getTwentyFourHourTimeSlice().getMillis(),
-                        state.getTwentyFourHourTimeSliceEnd().getMillis()));
-                }
-                state.getAggregationTasks().submit(new Aggregate6HourData(dao, \
                state, scheduleIds, queryFutures));
-            }
-        } catch (InterruptedException e) {
-            log.debug("An interrupt occurred while waiting for 6 hour data index \
                entries. Aborting data aggregation",
-                e);
-            log.info("An interrupt occurred while waiting for 6 hour data index \
                entries. Aborting data aggregation: " +
-                e.getMessage());
-        } finally {
-            state.getRemaining1HourData().addAndGet(-scheduleIds.size());
-        }
-    }
-
-    private void update6HourIndexEntries() throws InterruptedException {
-        try {
-            state.getSixHourIndexEntriesArrival().await();
-            try {
-                state.getSixHourIndexEntriesLock().writeLock().lock();
-                state.getSixHourIndexEntries().removeAll(scheduleIds);
-            } finally {
-                state.getSixHourIndexEntriesLock().writeLock().unlock();
-            }
-        } catch (AbortedException e) {
-            // This means we failed to retrieve the index entries. We can however
-            // continue generating 6 hour data because we do not need the index
-            // here since we already have 6 hour data to aggregate along with the
-            // schedule ids.
-        }
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
 deleted file mode 100644
index fc8c0cb..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
                
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * @author John Sanda
- */
-public class Aggregate6HourData implements Runnable {
-
-    private final Log log = LogFactory.getLog(Aggregate6HourData.class);
-
-    private MetricsDAO dao;
-
-    private AggregationState state;
-
-    private Set<Integer> scheduleIds;
-
-    private List<StorageResultSetFuture> queryFutures;
-
-    public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
                scheduleIds,
-        List<StorageResultSetFuture> queryFutures) {
-        this.dao = dao;
-        this.state = state;
-        this.scheduleIds = scheduleIds;
-        this.queryFutures = queryFutures;
-    }
-
-    @Override
-    public void run() {
-        final long start = System.currentTimeMillis();
-        ListenableFuture<List<ResultSet>> queriesFuture = \
                Futures.successfulAsList(queryFutures);
-        Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
-            @Override
-            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
                Exception {
-                log.error("An error occurred while fetching 6 hour data", t);
-                return Futures.immediateFailedFuture(t);
-            }
-        });
-        ListenableFuture<List<ResultSet>> computeFutures = \
                Futures.transform(queriesFuture,
-            state.getCompute24HourData(), state.getAggregationTasks());
-        Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
-            @Override
-            public void onSuccess(List<ResultSet> result) {
-                log.debug("Finished aggregating 6 hour data for " + result.size() + \
                " schedules in " +
-                    (System.currentTimeMillis() - start) + " ms");
-                state.getRemaining6HourData().addAndGet(-scheduleIds.size());
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to aggregate 6 hour data", t);
-                state.getRemaining6HourData().addAndGet(-scheduleIds.size());
-            }
-        });
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
 deleted file mode 100644
index 1eb9e53..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
                
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
-* @author John Sanda
-*/
-class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
-
-    private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
-
-    private Set<Integer> indexEntries;
-
-    private AtomicInteger remainingData;
-
-    private SignalingCountDownLatch indexEntriesArrival;
-
-    private long startTime;
-
-    private String src;
-
-    private String dest;
-
-    public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger \
                remainingData,
-        SignalingCountDownLatch indexEntriesArrival, long startTime, String src, \
                String dest) {
-        this.indexEntries = indexEntries;
-        this.remainingData = remainingData;
-        this.indexEntriesArrival = indexEntriesArrival;
-        this.startTime = startTime;
-        this.src = src;
-        this.dest = dest;
-    }
-
-    @Override
-    public void onSuccess(ResultSet resultSet) {
-        for (Row row : resultSet) {
-            indexEntries.add(row.getInt(1));
-        }
-        remainingData.set(indexEntries.size());
-        indexEntriesArrival.countDown();
-        if (log.isDebugEnabled()) {
-            log.debug("Finished loading " + indexEntries.size() + " " + src + " \
                index entries in " +
-                (System.currentTimeMillis() - startTime) + " ms");
-        }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-        log.warn("Failed to retrieve " + src + " index entries. Some " + dest + " \
                aggregates may not get generated.",
-            t);
-        remainingData.set(0);
-        indexEntriesArrival.abort();
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
 deleted file mode 100644
index dbfc289..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
                
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 1 hour data for a batch of raw data futures. After data is inserted for \
                the batch, aggregation of 1 hour
- * data will start immediately for the batch if the 6 hour time slice has finished.
- *
- * @see Compute1HourData
- * @author John Sanda
- */
-public class AggregateRawData implements Runnable {
-
-    private final Log log = LogFactory.getLog(AggregateRawData.class);
-
-    private MetricsDAO dao;
-
-    private AggregationState state;
-
-    private Set<Integer> scheduleIds;
-
-    private List<StorageResultSetFuture> queryFutures;
-
-    public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> \
                scheduleIds,
-        List<StorageResultSetFuture> queryFutures) {
-        this.dao = dao;
-        this.state = state;
-        this.scheduleIds = scheduleIds;
-        this.queryFutures = queryFutures;
-    }
-
-    @Override
-    public void run() {
-        final long start = System.currentTimeMillis();
-        ListenableFuture<List<ResultSet>> rawDataFutures = \
                Futures.successfulAsList(queryFutures);
-        Futures.withFallback(rawDataFutures, new FutureFallback<List<ResultSet>>() {
-            @Override
-            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
                Exception {
-                log.error("An error occurred while fetching raw data", t);
-                return Futures.immediateFailedFuture(t);
-            }
-        });
-
-        final ListenableFuture<List<ResultSet>> insert1HourDataFutures = \
                Futures.transform(rawDataFutures,
-            state.getCompute1HourData(), state.getAggregationTasks());
-        Futures.addCallback(insert1HourDataFutures, new \
                FutureCallback<List<ResultSet>>() {
-            @Override
-            public void onSuccess(List<ResultSet> resultSets) {
-                log.debug("Finished aggregating raw data for " + resultSets.size() + \
                " schedules in " +
-                    (System.currentTimeMillis() - start) + " ms");
-                start1HourDataAggregationIfNecessary();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to aggregate raw data", t);
-                // TODO maybe add debug statement to log those schedule ids for \
                which aggregation failed
-                start1HourDataAggregationIfNecessary();
-            }
-        }, state.getAggregationTasks());
-    }
-
-    private void start1HourDataAggregationIfNecessary() {
-        try {
-            if (state.is6HourTimeSliceFinished()) {
-                update1HourIndexEntries();
-                List<StorageResultSetFuture> oneHourDataQueryFutures = new \
                ArrayList<StorageResultSetFuture>(
-                    scheduleIds.size());
-                for (Integer scheduleId : scheduleIds) {
-                    \
                oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
-                        state.getSixHourTimeSlice().getMillis(), \
                state.getSixHourTimeSliceEnd().getMillis()));
-                }
-                state.getAggregationTasks().submit(new Aggregate1HourData(dao, \
                state, scheduleIds,
-                    oneHourDataQueryFutures));
-            }
-        } catch (InterruptedException e) {
-            log.debug("An interrupt occurred while waiting for 1 hour data index \
                entries. Aborting data aggregation",
-                e);
-            log.info("An interrupt occurred while waiting for 1 hour data index \
                entries. Aborting data aggregation: " +
-                e.getMessage());
-        } finally {
-            state.getRemainingRawData().addAndGet(-scheduleIds.size());
-        }
-    }
-
-    private void update1HourIndexEntries() throws InterruptedException {
-        try {
-            // Wait for the arrival so that we can remove the schedules ids in this
-            // batch from the one hour index entries. This will prevent duplicate \
                tasks
-            // being submitted to process the same 1 hour data.
-            state.getOneHourIndexEntriesArrival().await();
-            try {
-                state.getOneHourIndexEntriesLock().writeLock().lock();
-                state.getOneHourIndexEntries().removeAll(scheduleIds);
-            } finally {
-                state.getOneHourIndexEntriesLock().writeLock().unlock();
-            }
-        } catch (AbortedException e) {
-            // This means we failed to retrieve the index entries. We can however
-            // continue generating 1 hour data because we do not need the index
-            // here since we already have 1 hour data to aggregate along with the
-            // schedule ids.
-        }
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
 deleted file mode 100644
index bd1ce0d..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
                
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import org.joda.time.DateTime;
-
-/**
- * @author John Sanda
- */
-public class AggregationState {
-
-    private ListeningExecutorService aggregationTasks;
-
-    private SignalingCountDownLatch oneHourIndexEntriesArrival;
-
-    private SignalingCountDownLatch sixHourIndexEntriesArrival;
-
-    private AtomicInteger remainingRawData;
-
-    private AtomicInteger remaining1HourData;
-
-    private AtomicInteger remaining6HourData;
-
-    private Set<Integer> oneHourIndexEntries;
-
-    private Set<Integer> sixHourIndexEntries;
-
-    private ReentrantReadWriteLock oneHourIndexEntriesLock;
-
-    private ReentrantReadWriteLock sixHourIndexEntriesLock;
-
-    private DateTime oneHourTimeSlice;
-
-    private DateTime sixHourTimeSlice;
-
-    private DateTime sixHourTimeSliceEnd;
-
-    private DateTime twentyFourHourTimeSlice;
-
-    private DateTime twentyFourHourTimeSliceEnd;
-
-    private boolean sixHourTimeSliceFinished;
-
-    private boolean twentyFourHourTimeSliceFinished;
-
-    private Compute1HourData compute1HourData;
-
-    private Compute6HourData compute6HourData;
-
-    private Compute24HourData compute24HourData;
-
-    public ListeningExecutorService getAggregationTasks() {
-        return aggregationTasks;
-    }
-
-    public AggregationState setAggregationTasks(ListeningExecutorService \
                aggregationTasks) {
-        this.aggregationTasks = aggregationTasks;
-        return this;
-    }
-
-    /**
-     * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
                entries for schedules with 1 hour
-     * data to be aggregated
-     */
-    public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
-        return oneHourIndexEntriesArrival;
-    }
-
-    public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch \
                oneHourIndexEntriesArrival) {
-        this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
-        return this;
-    }
-
-    /**
-     * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
                entries for schedules with 6 hour
-     * data to be aggregated
-     */
-    public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
-        return sixHourIndexEntriesArrival;
-    }
-
-    public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch \
                sixHourIndexEntriesArrival) {
-        this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
-        return this;
-    }
-
-    /**
-     * @return The remaining number of schedules with raw data to be aggregated
-     */
-    public AtomicInteger getRemainingRawData() {
-        return remainingRawData;
-    }
-
-    public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
-        this.remainingRawData = remainingRawData;
-        return this;
-    }
-
-    /**
-     * @return The remaining number of schedules with 1 hour data to be aggregated
-     */
-    public AtomicInteger getRemaining1HourData() {
-        return remaining1HourData;
-    }
-
-    public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) \
                {
-        this.remaining1HourData = remaining1HourData;
-        return this;
-    }
-
-    /**
-     * @return The remaining number of schedules with 6 hour data to be aggregated
-     */
-    public AtomicInteger getRemaining6HourData() {
-        return remaining6HourData;
-    }
-
-    public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) \
                {
-        this.remaining6HourData = remaining6HourData;
-        return this;
-    }
-
-    /**
-     * @return The schedule ids with 1 hour data to be aggregated
-     */
-    public Set<Integer> getOneHourIndexEntries() {
-        return oneHourIndexEntries;
-    }
-
-    public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) \
                {
-        this.oneHourIndexEntries = oneHourIndexEntries;
-        return this;
-    }
-
-    public Set<Integer> getSixHourIndexEntries() {
-        return sixHourIndexEntries;
-    }
-
-    public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) \
                {
-        this.sixHourIndexEntries = sixHourIndexEntries;
-        return this;
-    }
-
-    public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
-        return oneHourIndexEntriesLock;
-    }
-
-    public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock \
                oneHourIndexEntriesLock) {
-        this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
-        return this;
-    }
-
-    public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
-        return sixHourIndexEntriesLock;
-    }
-
-    public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock \
                sixHourIndexEntriesLock) {
-        this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
-        return this;
-    }
-
-    public DateTime getOneHourTimeSlice() {
-        return oneHourTimeSlice;
-    }
-
-    public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
-        this.oneHourTimeSlice = oneHourTimeSlice;
-        return this;
-    }
-
-    public DateTime getSixHourTimeSlice() {
-        return sixHourTimeSlice;
-    }
-
-    public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
-        this.sixHourTimeSlice = sixHourTimeSlice;
-        return this;
-    }
-
-    public DateTime getSixHourTimeSliceEnd() {
-        return sixHourTimeSliceEnd;
-    }
-
-    public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
-        this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
-        return this;
-    }
-
-    public DateTime getTwentyFourHourTimeSlice() {
-        return twentyFourHourTimeSlice;
-    }
-
-    public AggregationState setTwentyFourHourTimeSlice(DateTime \
                twentyFourHourTimeSlice) {
-        this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
-        return this;
-    }
-
-    public DateTime getTwentyFourHourTimeSliceEnd() {
-        return twentyFourHourTimeSliceEnd;
-    }
-
-    public AggregationState setTwentyFourHourTimeSliceEnd(DateTime \
                twentyFourHourTimeSliceEnd) {
-        this.twentyFourHourTimeSliceEnd = twentyFourHourTimeSliceEnd;
-        return this;
-    }
-
-    public boolean is6HourTimeSliceFinished() {
-        return sixHourTimeSliceFinished;
-    }
-
-    public AggregationState set6HourTimeSliceFinished(boolean \
                is6HourTimeSliceFinished) {
-        this.sixHourTimeSliceFinished = is6HourTimeSliceFinished;
-        return this;
-    }
-
-    public boolean is24HourTimeSliceFinished() {
-        return twentyFourHourTimeSliceFinished;
-    }
-
-    public AggregationState set24HourTimeSliceFinished(boolean \
                is24HourTimeSliceFinished) {
-        this.twentyFourHourTimeSliceFinished = is24HourTimeSliceFinished;
-        return this;
-    }
-
-    public Compute1HourData getCompute1HourData() {
-        return compute1HourData;
-    }
-
-    public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
-        this.compute1HourData = compute1HourData;
-        return this;
-    }
-
-    public Compute6HourData getCompute6HourData() {
-        return compute6HourData;
-    }
-
-    public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
-        this.compute6HourData = compute6HourData;
-        return this;
-    }
-
-    public Compute24HourData getCompute24HourData() {
-        return compute24HourData;
-    }
-
-    public AggregationState setCompute24HourData(Compute24HourData \
                compute24HourData) {
-        this.compute24HourData = compute24HourData;
-        return this;
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
 deleted file mode 100644
index 4ff222a..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
                
+++ /dev/null
@@ -1,331 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Duration;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Aggregator {
-
-    private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = \
                new Comparator<AggregateNumericMetric>() {
-        @Override
-        public int compare(AggregateNumericMetric left, AggregateNumericMetric \
                right) {
-            return (left.getScheduleId() < right.getScheduleId()) ? -1 : \
                ((left.getScheduleId() == right.getScheduleId()) ? 0 : 1);
-        }
-    };
-
-    private final Log log = LogFactory.getLog(Aggregator.class);
-
-    private MetricsDAO dao;
-
-    private MetricsConfiguration configuration;
-
-    private DateTimeService dtService;
-
-    private DateTime startTime;
-
-    /**
-     * Signals when raw data index entries (in metrics_index) can be deleted. We \
                cannot delete the row in metrics_index
-     * until we know that it has been read, successfully or otherwise.
-     */
-    private SignalingCountDownLatch rawDataIndexEntriesArrival;
-
-    private RateLimiter readPermits;
-    private RateLimiter writePermits;
-
-    private int batchSize;
-
-    private AggregationState state;
-
-    private Set<AggregateNumericMetric> oneHourData;
-
-    private AtomicInteger remainingIndexEntries;
-
-    public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, \
                MetricsConfiguration configuration,
-        DateTimeService dtService, DateTime startTime, int batchSize, RateLimiter \
                writePermits,
-        RateLimiter readPermits) {
-        this.dao = dao;
-        this.configuration = configuration;
-        this.dtService = dtService;
-        this.startTime = startTime;
-        this.readPermits = readPermits;
-        this.writePermits = writePermits;
-        this.batchSize = batchSize;
-        oneHourData = new \
                ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
-        rawDataIndexEntriesArrival = new SignalingCountDownLatch(new \
                CountDownLatch(1));
-        remainingIndexEntries = new AtomicInteger(1);
-
-        DateTime sixHourTimeSlice = get6HourTimeSlice();
-        DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
-
-        state = new AggregationState()
-            .setAggregationTasks(aggregationTasks)
-            .setOneHourTimeSlice(startTime)
-            .setSixHourTimeSlice(sixHourTimeSlice)
-            .setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
                
-            .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
-            .setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
                
-            .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice, \
                writePermits, dao, oneHourData))
-            .setCompute6HourData(new Compute6HourData(sixHourTimeSlice, \
                twentyFourHourTimeSlice, writePermits, dao))
-            .setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice, \
                writePermits, dao))
-            .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice, \
                configuration.getOneHourTimeSliceDuration()))
-            .set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice,
-                configuration.getSixHourTimeSliceDuration()))
-            .setRemainingRawData(new AtomicInteger(0))
-            .setRemaining1HourData(new AtomicInteger(0))
-            .setRemaining6HourData(new AtomicInteger(0))
-            .setOneHourIndexEntries(new TreeSet<Integer>())
-            .setSixHourIndexEntries(new TreeSet<Integer>())
-            .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
-            .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
-
-        if (state.is6HourTimeSliceFinished()) {
-            state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
                CountDownLatch(1)));
-            remainingIndexEntries.incrementAndGet();
-        } else {
-            state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
                CountDownLatch(0)));
-            state.setRemaining1HourData(new AtomicInteger(0));
-        }
-
-        if (state.is24HourTimeSliceFinished()) {
-            state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
                CountDownLatch(1)));
-            remainingIndexEntries.incrementAndGet();
-        } else {
-            state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
                CountDownLatch(0)));
-            state.setRemaining6HourData(new AtomicInteger(0));
-        }
-    }
-
-    private DateTime get24HourTimeSlice() {
-        return dtService.getTimeSlice(startTime, \
                configuration.getSixHourTimeSliceDuration());
-    }
-
-    private DateTime get6HourTimeSlice() {
-        return dtService.getTimeSlice(startTime, \
                configuration.getOneHourTimeSliceDuration());
-    }
-
-    private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
-        DateTime endTime = startTime.plus(duration);
-        return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= \
                0;
-    }
-
-    protected DateTime currentHour() {
-        return dtService.getTimeSlice(dtService.now(), \
                configuration.getRawTimeSliceDuration());
-    }
-
-    public Set<AggregateNumericMetric> run() {
-        log.info("Starting aggregation for time slice " + startTime);
-        readPermits.acquire();
-        StorageResultSetFuture rawFuture = \
                dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
-            startTime.getMillis());
-        Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
-            @Override
-            public void onSuccess(ResultSet result) {
-                List<Row> rows = result.all();
-                state.getRemainingRawData().set(rows.size());
-                rawDataIndexEntriesArrival.countDown();
-
-                log.debug("Starting raw data aggregation for " + rows.size() + " \
                schedules");
-                long start = System.currentTimeMillis();
-                final DateTime endTime = \
                startTime.plus(configuration.getRawTimeSliceDuration());
-                Set<Integer> scheduleIds = new TreeSet<Integer>();
-                List<StorageResultSetFuture> rawDataFutures = new \
                ArrayList<StorageResultSetFuture>(batchSize);
-                for (final Row row : rows) {
-                    scheduleIds.add(row.getInt(1));
-                    readPermits.acquire();
-                    rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), \
                startTime.getMillis(),
-                        endTime.getMillis()));
-                    if (rawDataFutures.size() == batchSize) {
-                        state.getAggregationTasks().submit(new AggregateRawData(dao, \
                state, scheduleIds,
-                            rawDataFutures));
-                        rawDataFutures = new ArrayList<StorageResultSetFuture>();
-                        scheduleIds = new TreeSet<Integer>();
-                    }
-                }
-                if (!rawDataFutures.isEmpty()) {
-                    state.getAggregationTasks().submit(new AggregateRawData(dao, \
                state, scheduleIds,
-                        rawDataFutures));
-                }
-                log.debug("Finished processing one hour index entries in " + \
                (System.currentTimeMillis() - start) +
-                    " ms");
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to retrieve raw data index entries. Raw data \
                aggregation for time slice [" +
-                    startTime + "] cannot proceed.", t);
-                state.setRemainingRawData(new AtomicInteger(0));
-                rawDataIndexEntriesArrival.abort();
-                deleteIndexEntries(MetricsTable.ONE_HOUR);
-            }
-        }, state.getAggregationTasks());
-
-        if (state.is6HourTimeSliceFinished()) {
-            long start = System.currentTimeMillis();
-            log.debug("Fetching 1 hour index entries");
-            StorageResultSetFuture oneHourFuture = \
                dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
-                state.getSixHourTimeSlice().getMillis());
-            Futures.addCallback(oneHourFuture, new \
                AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
-                state.getRemaining1HourData(), \
                state.getOneHourIndexEntriesArrival(), start, "1 hour", "6 hour"),
-                state.getAggregationTasks());
-        }
-
-        if (state.is24HourTimeSliceFinished()) {
-            long start = System.currentTimeMillis();
-            log.debug("Fetching 6 hour index entries");
-            StorageResultSetFuture sixHourFuture = \
                dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
-                state.getTwentyFourHourTimeSlice().getMillis());
-            Futures.addCallback(sixHourFuture, new \
                AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
-                state.getRemaining6HourData(), \
                state.getSixHourIndexEntriesArrival(), start, "6 hour", "24 hour"),
-                state.getAggregationTasks());
-        }
-
-        try {
-            try {
-                rawDataIndexEntriesArrival.await();
-                deleteIndexEntries(MetricsTable.ONE_HOUR);
-            } catch (AbortedException e) {
-            }
-
-            if (state.is6HourTimeSliceFinished()) {
-                waitFor(state.getRemainingRawData());
-                try {
-                    state.getOneHourIndexEntriesArrival().await();
-                    deleteIndexEntries(MetricsTable.SIX_HOUR);
-
-                    List<StorageResultSetFuture> queryFutures = new \
                ArrayList<StorageResultSetFuture>(batchSize);
-                    Set<Integer> scheduleIds = new TreeSet<Integer>();
-                    state.getOneHourIndexEntriesLock().writeLock().lock();
-                    for (Integer scheduleId : state.getOneHourIndexEntries()) {
-                        queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, \
                state.getSixHourTimeSlice().getMillis(),
-                            state.getSixHourTimeSliceEnd().getMillis()));
-                        scheduleIds.add(scheduleId);
-                        if (queryFutures.size() == batchSize) {
-                            state.getAggregationTasks().submit(new \
                Aggregate1HourData(dao, state, scheduleIds,
-                                queryFutures));
-                            queryFutures = new \
                ArrayList<StorageResultSetFuture>(batchSize);
-                            scheduleIds = new TreeSet<Integer>();
-                        }
-                    }
-                    if (!queryFutures.isEmpty()) {
-                        state.getAggregationTasks().submit(new \
                Aggregate1HourData(dao, state, scheduleIds,
-                            queryFutures));
-                        queryFutures = null;
-                        scheduleIds = null;
-                    }
-                } catch (AbortedException e) {
-                    log.warn("Failed to load 1 hour index entries. Some 6 hour \
                aggregates may not get generated.", e);
-                } finally {
-                    state.getOneHourIndexEntriesLock().writeLock().unlock();
-                }
-            }
-
-            if (state.is24HourTimeSliceFinished()) {
-                waitFor(state.getRemaining1HourData());
-                try {
-                    state.getSixHourIndexEntriesArrival().await();
-                    deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
-
-                    List<StorageResultSetFuture> queryFutures = new \
                ArrayList<StorageResultSetFuture>(batchSize);
-                    Set<Integer> scheduleIds = new TreeSet<Integer>();
-                    state.getSixHourIndexEntriesLock().writeLock().lock();
-                    for (Integer scheduleId : state.getSixHourIndexEntries()) {
-                        queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
                state.getTwentyFourHourTimeSlice().getMillis(),
-                            state.getTwentyFourHourTimeSliceEnd().getMillis()));
-                        scheduleIds.add(scheduleId);
-                        if (queryFutures.size() == batchSize) {
-                            state.getAggregationTasks().submit(new \
                Aggregate6HourData(dao, state, scheduleIds,
-                                queryFutures));
-                            queryFutures = new \
                ArrayList<StorageResultSetFuture>(batchSize);
-                            scheduleIds = new TreeSet<Integer>();
-                        }
-                    }
-                    if (!queryFutures.isEmpty()) {
-                        log.debug("Submitting 6 hour aggregation task for schedule \
                ids " + scheduleIds);
-                        state.getAggregationTasks().submit(new \
                Aggregate6HourData(dao, state, scheduleIds,
-                            queryFutures));
-                        queryFutures = null;
-                        scheduleIds = null;
-                    }
-                } catch (AbortedException e) {
-                    log.warn("Failed to load 6 hour index entries. Some 24 hour \
                aggregates may not get generated.", e);
-                } finally {
-                    state.getSixHourIndexEntriesLock().writeLock().unlock();
-                }
-            }
-
-            while (!isAggregationFinished()) {
-                Thread.sleep(50);
-            }
-        } catch (InterruptedException e) {
-            log.warn("An interrupt occurred while waiting for aggregation to \
                finish", e);
-        }
-        return oneHourData;
-    }
-
-    private void waitFor(AtomicInteger remainingData) throws InterruptedException {
-        while (remainingData.get() > 0) {
-            Thread.sleep(50);
-        }
-    }
-
-    private boolean isAggregationFinished() {
-        return state.getRemainingRawData().get() <= 0 && \
                state.getRemaining1HourData().get() <= 0 &&
-            state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() \
                <= 0;
-    }
-
-    private void deleteIndexEntries(final MetricsTable table) {
-        final DateTime time;
-        switch (table) {
-        case ONE_HOUR:
-            time = startTime;
-            break;
-        case SIX_HOUR:
-            time = state.getSixHourTimeSlice();
-            break;
-        default:
-            time = state.getTwentyFourHourTimeSlice();
-            break;
-        }
-        log.debug("Deleting " + table + " index entries for time slice " + time);
-        writePermits.acquire();
-        StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, \
                time.getMillis());
-        Futures.addCallback(future, new FutureCallback<ResultSet>() {
-            @Override
-            public void onSuccess(ResultSet result) {
-                remainingIndexEntries.decrementAndGet();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to delete index entries for table " + table + " at \
                time [" + time + "]");
-                remainingIndexEntries.decrementAndGet();
-            }
-        });
-    }
-
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
 deleted file mode 100644
index 7146a92..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
                
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute1HourData implements AsyncFunction<List<ResultSet>, \
                List<ResultSet>> {
-
-    private final Log log = LogFactory.getLog(Compute1HourData.class);
-
-    private DateTime startTime;
-
-    private RateLimiter writePermits;
-
-    private MetricsDAO dao;
-
-    private DateTime sixHourTimeSlice;
-
-    private Set<AggregateNumericMetric> oneHourData;
-
-    public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, \
                RateLimiter writePermits, MetricsDAO dao,
-        Set<AggregateNumericMetric> oneHourData) {
-        this.startTime = startTime;
-        this.sixHourTimeSlice = sixHourTimeSlice;
-        this.writePermits = writePermits;
-        this.dao = dao;
-        this.oneHourData = oneHourData;
-    }
-
-    @Override
-    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
                rawDataResultSets) throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Computing and storing 1 hour data for " + \
                rawDataResultSets.size() + " schedules");
-        }
-        long start = System.currentTimeMillis();
-        try {
-            List<StorageResultSetFuture> insertFutures = new \
                ArrayList<StorageResultSetFuture>(rawDataResultSets.size());
-            for (ResultSet resultSet : rawDataResultSets) {
-                AggregateNumericMetric aggregate = \
                calculateAggregatedRaw(resultSet);
-                oneHourData.add(aggregate);
-                writePermits.acquire(4);
-                insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MIN, aggregate.getMin()));
-                insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MAX, aggregate.getMax()));
-                insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.AVG, aggregate.getAvg()));
-                insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR, \
                aggregate.getScheduleId(),
-                    sixHourTimeSlice.getMillis()));
-            }
-            return Futures.successfulAsList(insertFutures);
-        } finally {
-            if (log.isDebugEnabled()) {
-                log.debug("Finished computing and storing 1 hour data for " + \
                rawDataResultSets.size() +
-                    " schedules in " + (System.currentTimeMillis() - start) + " \
                ms");
-            }
-        }
-    }
-
-    private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
-        double min = Double.NaN;
-        double max = min;
-        int count = 0;
-        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
-        double value;
-        List<Row> rows = resultSet.all();
-
-        for (Row row : rows) {
-            value = row.getDouble(2);
-            if (count == 0) {
-                min = value;
-                max = min;
-            }
-            if (value < min) {
-                min = value;
-            } else if (value > max) {
-                max = value;
-            }
-            mean.add(value);
-            ++count;
-        }
-
-        return new AggregateNumericMetric(rows.get(0).getInt(0), \
                mean.getArithmeticMean(), min, max,
-            startTime.getMillis());
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
 deleted file mode 100644
index 6274bd3..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
                
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-
-/**
- * @author John Sanda
- */
-public class Compute24HourData implements AsyncFunction<List<ResultSet>, \
                List<ResultSet>> {
-
-    private final Log log = LogFactory.getLog(Compute24HourData.class);
-
-    private DateTime startTime;
-
-    private RateLimiter writePermits;
-
-    private MetricsDAO dao;
-
-    public Compute24HourData(DateTime startTime, RateLimiter writePermits, \
                MetricsDAO dao) {
-        this.startTime = startTime;
-        this.writePermits = writePermits;
-        this.dao = dao;
-    }
-
-    @Override
-    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
                sixHourDataResultSets) throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Computing and storing 24 hour data for " + \
                sixHourDataResultSets.size() + " schedules");
-        }
-        long start = System.currentTimeMillis();
-        try {
-            List<StorageResultSetFuture> insertFutures =
-                new ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
-            for (ResultSet resultSet : sixHourDataResultSets) {
-                AggregateNumericMetric aggregate = calculateAggregate(resultSet);
-                writePermits.acquire(3);
-                insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MIN, aggregate.getMin()));
-                insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MAX, aggregate.getMax()));
-                insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.AVG, aggregate.getAvg()));
-            }
-            return Futures.successfulAsList(insertFutures);
-        } finally {
-            if (log.isDebugEnabled()) {
-                log.debug("Finished computing and storing 24 hour data for " + \
                sixHourDataResultSets.size() +
-                    " schedules in " + (System.currentTimeMillis() - start) + " \
                ms");
-            }
-        }
-    }
-
-    private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
-        double min = Double.NaN;
-        double max = min;
-        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
-        List<Row> rows = resultSet.all();
-
-        for (int i = 0; i < rows.size(); i += 3) {
-            if (i == 0) {
-                min = rows.get(i + 1).getDouble(3);
-                max = rows.get(i).getDouble(3);
-            } else {
-                if (rows.get(i + 1).getDouble(3) < min) {
-                    min = rows.get(i + 1).getDouble(3);
-                }
-                if (rows.get(i).getDouble(3) > max) {
-                    max = rows.get(i).getDouble(3);
-                }
-            }
-            mean.add(rows.get(i + 2).getDouble(3));
-        }
-        return new AggregateNumericMetric(rows.get(0).getInt(0), \
                mean.getArithmeticMean(), min, max,
-            startTime.getMillis());
-    }
-
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
 deleted file mode 100644
index a1efab5..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
                
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute6HourData implements AsyncFunction<List<ResultSet>, \
                List<ResultSet>> {
-
-    private final Log log = LogFactory.getLog(Compute6HourData.class);
-
-    private DateTime startTime;
-
-    private RateLimiter writePermits;
-
-    private MetricsDAO dao;
-
-    private DateTime twentyFourHourTimeSlice;
-
-    public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice, \
                RateLimiter writePermits,
-        MetricsDAO dao) {
-        this.startTime = startTime;
-        this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
-        this.writePermits = writePermits;
-        this.dao = dao;
-    }
-
-    @Override
-    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
                oneHourDataResultSets) throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Computing and storing 6 hour data for " + \
                oneHourDataResultSets.size() + " schedules");
-        }
-        long start = System.currentTimeMillis();
-        try {
-            List<StorageResultSetFuture> insertFutures =
-                new ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
-            for (ResultSet resultSet : oneHourDataResultSets) {
-                AggregateNumericMetric aggregate = calculateAggregate(resultSet);
-                writePermits.acquire(4);
-                insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MIN, aggregate.getMin()));
-                insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.MAX, aggregate.getMax()));
-                insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
                aggregate.getTimestamp(),
-                    AggregateType.AVG, aggregate.getAvg()));
-                insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR, \
                aggregate.getScheduleId(),
-                    twentyFourHourTimeSlice.getMillis()));
-            }
-            return Futures.successfulAsList(insertFutures);
-        } finally {
-            if (log.isDebugEnabled()) {
-                log.debug("Finished computing and storing 6 hour data for " + \
                oneHourDataResultSets.size() +
-                    " schedules in " + (System.currentTimeMillis() - start) + " \
                ms");
-            }
-        }
-    }
-
-    private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
-        double min = Double.NaN;
-        double max = min;
-        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
-        List<Row> rows = resultSet.all();
-
-        for (int i = 0; i < rows.size(); i += 3) {
-            if (i == 0) {
-                min = rows.get(i + 1).getDouble(3);
-                max = rows.get(i).getDouble(3);
-            } else {
-                if (rows.get(i + 1).getDouble(3) < min) {
-                    min = rows.get(i + 1).getDouble(3);
-                }
-                if (rows.get(i).getDouble(3) > max) {
-                    max = rows.get(i).getDouble(3);
-                }
-            }
-            mean.add(rows.get(i + 2).getDouble(3));
-        }
-        return new AggregateNumericMetric(rows.get(0).getInt(0), \
                mean.getArithmeticMean(), min, max,
-            startTime.getMillis());
-    }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
 index 0dc4605..7f5b639 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
                
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
 @@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +55,7 @@ import org.joda.time.Duration;
 
 import org.rhq.core.domain.measurement.MeasurementDataNumeric;
 import org.rhq.core.domain.measurement.composite.MeasurementDataNumericHighLowComposite;
 +import org.rhq.server.metrics.aggregation.Aggregator;
 import org.rhq.server.metrics.domain.AggregateNumericMetric;
 import org.rhq.server.metrics.domain.AggregateType;
 import org.rhq.server.metrics.domain.MetricsIndexEntry;
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
 new file mode 100644
index 0000000..1698b28
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
 @@ -0,0 +1,127 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted \
for the batch, aggregation of 6 hour + * data will start immediately for the batch if \
the 24 hour time slice has finished. + *
+ * @see Compute6HourData
+ * @author John Sanda
+ */
+class Aggregate1HourData implements Runnable {
+
+    private final Log log = LogFactory.getLog(Aggregate1HourData.class);
+
+    private MetricsDAO dao;
+
+    private AggregationState state;
+
+    private Set<Integer> scheduleIds;
+
+    private List<StorageResultSetFuture> queryFutures;
+
+    public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, +        List<StorageResultSetFuture> queryFutures) {
+        this.dao = dao;
+        this.state = state;
+        this.scheduleIds = scheduleIds;
+        this.queryFutures = queryFutures;
+    }
+
+    @Override
+    public void run() {
+        final Stopwatch stopwatch = new Stopwatch().start();
+        ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures); +        Futures.withFallback(queriesFuture, \
new FutureFallback<List<ResultSet>>() { +            @Override
+            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { +                log.error("An error occurred while fetching one hour \
data", t); +                return Futures.immediateFailedFuture(t);
+            }
+        });
+        ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture, +            state.getCompute6HourData(), \
state.getAggregationTasks()); +        Futures.addCallback(computeFutures, new \
FutureCallback<List<ResultSet>>() { +            @Override
+            public void onSuccess(List<ResultSet> result) {
+                stopwatch.stop();
+                log.debug("Finished aggregating 1 hour data for " + result.size() + \
" schedules in " + +                    stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); +                start6HourDataAggregationIfNecessary();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (log.isDebugEnabled()) {
+                    // TODO should we log the schedule ids?
+                    log.debug("Failed to aggregate 1 hour data for " + \
scheduleIds.size() + " schedules. An " + +                        "unexpected error \
occurred.", t); +                } else {
+                    log.warn("Failed to aggregate 1 hour data for " + \
scheduleIds.size() + " schedules. An " + +                        "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); +                }
+                start6HourDataAggregationIfNecessary();
+            }
+        });
+    }
+
+    private void start6HourDataAggregationIfNecessary() {
+        try {
+            if (state.is24HourTimeSliceFinished()) {
+                update6HourIndexEntries();
+                List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(scheduleIds.size()); +                for (Integer \
scheduleId : scheduleIds) { +                    \
queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(), +                        \
state.getTwentyFourHourTimeSliceEnd().getMillis())); +                }
+                state.getAggregationTasks().submit(new Aggregate6HourData(dao, \
state, scheduleIds, queryFutures)); +            }
+        } catch (InterruptedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data aggregation", +                    e);
+            } else {
+                log.info("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data " + +                    "aggregation: " + e.getMessage());
+            }
+        } finally {
+            state.getRemaining1HourData().addAndGet(-scheduleIds.size());
+        }
+    }
+
+    private void update6HourIndexEntries() throws InterruptedException {
+        try {
+            state.getSixHourIndexEntriesArrival().await();
+            try {
+                state.getSixHourIndexEntriesLock().writeLock().lock();
+                state.getSixHourIndexEntries().removeAll(scheduleIds);
+            } finally {
+                state.getSixHourIndexEntriesLock().writeLock().unlock();
+            }
+        } catch (AbortedException e) {
+            // This means we failed to retrieve the index entries. We can however
+            // continue generating 6 hour data because we do not need the index
+            // here since we already have 6 hour data to aggregate along with the
+            // schedule ids.
+        }
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
 new file mode 100644
index 0000000..fbd5057
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
 @@ -0,0 +1,84 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted \
for the batch, aggregation of 6 + * hour data will start immediately for the batch if \
the 24 hour time slice has finished. + *
+ * @see Compute24HourData
+ * @author John Sanda
+ */
+class Aggregate6HourData implements Runnable {
+
+    private final Log log = LogFactory.getLog(Aggregate6HourData.class);
+
+    private MetricsDAO dao;
+
+    private AggregationState state;
+
+    private Set<Integer> scheduleIds;
+
+    private List<StorageResultSetFuture> queryFutures;
+
+    public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, +        List<StorageResultSetFuture> queryFutures) {
+        this.dao = dao;
+        this.state = state;
+        this.scheduleIds = scheduleIds;
+        this.queryFutures = queryFutures;
+    }
+
+    @Override
+    public void run() {
+        final Stopwatch stopwatch = new Stopwatch().start();
+        ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures); +        Futures.withFallback(queriesFuture, \
new FutureFallback<List<ResultSet>>() { +            @Override
+            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { +                log.error("An error occurred while fetching 6 hour \
data", t); +                return Futures.immediateFailedFuture(t);
+            }
+        });
+        ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture, +            state.getCompute24HourData(), \
state.getAggregationTasks()); +        Futures.addCallback(computeFutures, new \
FutureCallback<List<ResultSet>>() { +            @Override
+            public void onSuccess(List<ResultSet> result) {
+                stopwatch.stop();
+                log.debug("Finished aggregating 6 hour data for " + result.size() + \
" schedules in " + +                    stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); +                state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (log.isDebugEnabled()) {
+                    // TODO should we log the schedule ids?
+                    log.debug("Failed to aggregate 6 hour data for " + \
scheduleIds.size() + " schedules. An " + +                        "unexpected error \
occurred.", t); +                } else {
+                    log.warn("Failed to aggregate 6 hour data for " + \
scheduleIds.size() + " schedules. An " + +                        "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); +                }
+                state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+            }
+        });
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
 new file mode 100644
index 0000000..cb64fcf
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
 @@ -0,0 +1,73 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+* @author John Sanda
+*/
+class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
+
+    private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
+
+    private Set<Integer> indexEntries;
+
+    private AtomicInteger remainingData;
+
+    private SignalingCountDownLatch indexEntriesArrival;
+
+    private Stopwatch stopwatch;
+
+    private String src;
+
+    private String dest;
+
+    public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger \
remainingData, +        SignalingCountDownLatch indexEntriesArrival, Stopwatch \
stopwatch, String src, String dest) { +        this.indexEntries = indexEntries;
+        this.remainingData = remainingData;
+        this.indexEntriesArrival = indexEntriesArrival;
+        this.stopwatch = stopwatch;
+        this.src = src;
+        this.dest = dest;
+    }
+
+    @Override
+    public void onSuccess(ResultSet resultSet) {
+        for (Row row : resultSet) {
+            indexEntries.add(row.getInt(1));
+        }
+        remainingData.set(indexEntries.size());
+        indexEntriesArrival.countDown();
+        stopwatch.stop();
+        if (log.isDebugEnabled()) {
+            log.debug("Finished loading " + indexEntries.size() + " " + src + " \
index entries in " + +                stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); +        }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+        if (log.isDebugEnabled()) {
+            log.debug("Some " + dest + " aggregates may not get computed. An \
unexpected error occurred while " + +                "retrieving " + src + " index \
entries", t); +        } else {
+            log.warn("Some " + dest + " aggregates may not get computed. An \
unexpected error occurred while " + +                "retrieving " + src + " index \
entries: " + ThrowableUtil.getRootMessage(t)); +        }
+        remainingData.set(0);
+        indexEntriesArrival.abort();
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
 new file mode 100644
index 0000000..87a7266
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
 @@ -0,0 +1,133 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 1 hour data for a batch of raw data futures. After data is inserted for \
the batch, aggregation of 1 hour + * data will start immediately for the batch if the \
6 hour time slice has finished. + *
+ * @see Compute1HourData
+ * @author John Sanda
+ */
+class AggregateRawData implements Runnable {
+
+    private final Log log = LogFactory.getLog(AggregateRawData.class);
+
+    private MetricsDAO dao;
+
+    private AggregationState state;
+
+    private Set<Integer> scheduleIds;
+
+    private List<StorageResultSetFuture> queryFutures;
+
+    public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, +        List<StorageResultSetFuture> queryFutures) {
+        this.dao = dao;
+        this.state = state;
+        this.scheduleIds = scheduleIds;
+        this.queryFutures = queryFutures;
+    }
+
+    @Override
+    public void run() {
+        final Stopwatch stopwatch = new Stopwatch().start();
+        ListenableFuture<List<ResultSet>> rawDataFutures = \
Futures.successfulAsList(queryFutures); +        Futures.withFallback(rawDataFutures, \
new FutureFallback<List<ResultSet>>() { +            @Override
+            public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { +                log.error("An error occurred while fetching raw data", \
t); +                return Futures.immediateFailedFuture(t);
+            }
+        });
+
+        final ListenableFuture<List<ResultSet>> insert1HourDataFutures = \
Futures.transform(rawDataFutures, +            state.getCompute1HourData(), \
state.getAggregationTasks()); +        Futures.addCallback(insert1HourDataFutures, \
new FutureCallback<List<ResultSet>>() { +            @Override
+            public void onSuccess(List<ResultSet> resultSets) {
+                stopwatch.stop();
+                log.debug("Finished aggregating raw data for " + resultSets.size() + \
" schedules in " + +                    stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); +                start1HourDataAggregationIfNecessary();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (log.isDebugEnabled()) {
+                    // TODO should we log the schedule ids?
+                    log.debug("Failed to aggregate raw data for " + \
scheduleIds.size() + " schedules. An unexpected " + +                        "error \
occurred.", t); +                } else {
+                    log.warn("Failed to aggregate raw data for " + \
scheduleIds.size() + " schedules. An " + +                        "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); +                }
+                start1HourDataAggregationIfNecessary();
+            }
+        }, state.getAggregationTasks());
+    }
+
+    private void start1HourDataAggregationIfNecessary() {
+        try {
+            if (state.is6HourTimeSliceFinished()) {
+                update1HourIndexEntries();
+                List<StorageResultSetFuture> oneHourDataQueryFutures = new \
ArrayList<StorageResultSetFuture>( +                    scheduleIds.size());
+                for (Integer scheduleId : scheduleIds) {
+                    \
oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId, +                 \
state.getSixHourTimeSlice().getMillis(), \
state.getSixHourTimeSliceEnd().getMillis())); +                }
+                state.getAggregationTasks().submit(new Aggregate1HourData(dao, \
state, scheduleIds, +                    oneHourDataQueryFutures));
+            }
+        } catch (InterruptedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data aggregation", +                    e);
+            } else {
+                log.info("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data " + +                    "aggregation: " + e.getMessage());
+            }
+        } finally {
+            state.getRemainingRawData().addAndGet(-scheduleIds.size());
+        }
+    }
+
+    private void update1HourIndexEntries() throws InterruptedException {
+        try {
+            // Wait for the arrival so that we can remove the schedules ids in this
+            // batch from the one hour index entries. This will prevent duplicate \
tasks +            // being submitted to process the same 1 hour data.
+            state.getOneHourIndexEntriesArrival().await();
+            try {
+                state.getOneHourIndexEntriesLock().writeLock().lock();
+                state.getOneHourIndexEntries().removeAll(scheduleIds);
+            } finally {
+                state.getOneHourIndexEntriesLock().writeLock().unlock();
+            }
+        } catch (AbortedException e) {
+            // This means we failed to retrieve the index entries. We can however
+            // continue generating 1 hour data because we do not need the index
+            // here since we already have 1 hour data to aggregate along with the
+            // schedule ids.
+        }
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
 new file mode 100644
index 0000000..345e53a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
 @@ -0,0 +1,257 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+ * @author John Sanda
+ */
+class AggregationState {
+
+    private ListeningExecutorService aggregationTasks;
+
+    private SignalingCountDownLatch oneHourIndexEntriesArrival;
+
+    private SignalingCountDownLatch sixHourIndexEntriesArrival;
+
+    private AtomicInteger remainingRawData;
+
+    private AtomicInteger remaining1HourData;
+
+    private AtomicInteger remaining6HourData;
+
+    private Set<Integer> oneHourIndexEntries;
+
+    private Set<Integer> sixHourIndexEntries;
+
+    private ReentrantReadWriteLock oneHourIndexEntriesLock;
+
+    private ReentrantReadWriteLock sixHourIndexEntriesLock;
+
+    private DateTime oneHourTimeSlice;
+
+    private DateTime sixHourTimeSlice;
+
+    private DateTime sixHourTimeSliceEnd;
+
+    private DateTime twentyFourHourTimeSlice;
+
+    private DateTime twentyFourHourTimeSliceEnd;
+
+    private boolean sixHourTimeSliceFinished;
+
+    private boolean twentyFourHourTimeSliceFinished;
+
+    private Compute1HourData compute1HourData;
+
+    private Compute6HourData compute6HourData;
+
+    private Compute24HourData compute24HourData;
+
+    public ListeningExecutorService getAggregationTasks() {
+        return aggregationTasks;
+    }
+
+    public AggregationState setAggregationTasks(ListeningExecutorService \
aggregationTasks) { +        this.aggregationTasks = aggregationTasks;
+        return this;
+    }
+
+    /**
+     * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 1 hour +     * data to be aggregated
+     */
+    public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
+        return oneHourIndexEntriesArrival;
+    }
+
+    public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch \
oneHourIndexEntriesArrival) { +        this.oneHourIndexEntriesArrival = \
oneHourIndexEntriesArrival; +        return this;
+    }
+
+    /**
+     * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 6 hour +     * data to be aggregated
+     */
+    public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
+        return sixHourIndexEntriesArrival;
+    }
+
+    public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch \
sixHourIndexEntriesArrival) { +        this.sixHourIndexEntriesArrival = \
sixHourIndexEntriesArrival; +        return this;
+    }
+
+    /**
+     * @return The remaining number of schedules with raw data to be aggregated
+     */
+    public AtomicInteger getRemainingRawData() {
+        return remainingRawData;
+    }
+
+    public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
+        this.remainingRawData = remainingRawData;
+        return this;
+    }
+
+    /**
+     * @return The remaining number of schedules with 1 hour data to be aggregated
+     */
+    public AtomicInteger getRemaining1HourData() {
+        return remaining1HourData;
+    }
+
+    public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) \
{ +        this.remaining1HourData = remaining1HourData;
+        return this;
+    }
+
+    /**
+     * @return The remaining number of schedules with 6 hour data to be aggregated
+     */
+    public AtomicInteger getRemaining6HourData() {
+        return remaining6HourData;
+    }
+
+    public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) \
{ +        this.remaining6HourData = remaining6HourData;
+        return this;
+    }
+
+    /**
+     * @return The schedule ids with 1 hour data to be aggregated
+     */
+    public Set<Integer> getOneHourIndexEntries() {
+        return oneHourIndexEntries;
+    }
+
+    public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) \
{ +        this.oneHourIndexEntries = oneHourIndexEntries;
+        return this;
+    }
+
+    public Set<Integer> getSixHourIndexEntries() {
+        return sixHourIndexEntries;
+    }
+
+    public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) \
{ +        this.sixHourIndexEntries = sixHourIndexEntries;
+        return this;
+    }
+
+    public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
+        return oneHourIndexEntriesLock;
+    }
+
+    public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock \
oneHourIndexEntriesLock) { +        this.oneHourIndexEntriesLock = \
oneHourIndexEntriesLock; +        return this;
+    }
+
+    public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
+        return sixHourIndexEntriesLock;
+    }
+
+    public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock \
sixHourIndexEntriesLock) { +        this.sixHourIndexEntriesLock = \
sixHourIndexEntriesLock; +        return this;
+    }
+
+    public DateTime getOneHourTimeSlice() {
+        return oneHourTimeSlice;
+    }
+
+    public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+        this.oneHourTimeSlice = oneHourTimeSlice;
+        return this;
+    }
+
+    public DateTime getSixHourTimeSlice() {
+        return sixHourTimeSlice;
+    }
+
+    public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
+        this.sixHourTimeSlice = sixHourTimeSlice;
+        return this;
+    }
+
+    public DateTime getSixHourTimeSliceEnd() {
+        return sixHourTimeSliceEnd;
+    }
+
+    public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
+        this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
+        return this;
+    }
+
+    public DateTime getTwentyFourHourTimeSlice() {
+        return twentyFourHourTimeSlice;
+    }
+
+    public AggregationState setTwentyFourHourTimeSlice(DateTime \
twentyFourHourTimeSlice) { +        this.twentyFourHourTimeSlice = \
twentyFourHourTimeSlice; +        return this;
+    }
+
+    public DateTime getTwentyFourHourTimeSliceEnd() {
+        return twentyFourHourTimeSliceEnd;
+    }
+
+    public AggregationState setTwentyFourHourTimeSliceEnd(DateTime \
twentyFourHourTimeSliceEnd) { +        this.twentyFourHourTimeSliceEnd = \
twentyFourHourTimeSliceEnd; +        return this;
+    }
+
+    public boolean is6HourTimeSliceFinished() {
+        return sixHourTimeSliceFinished;
+    }
+
+    public AggregationState set6HourTimeSliceFinished(boolean \
is6HourTimeSliceFinished) { +        this.sixHourTimeSliceFinished = \
is6HourTimeSliceFinished; +        return this;
+    }
+
+    public boolean is24HourTimeSliceFinished() {
+        return twentyFourHourTimeSliceFinished;
+    }
+
+    public AggregationState set24HourTimeSliceFinished(boolean \
is24HourTimeSliceFinished) { +        this.twentyFourHourTimeSliceFinished = \
is24HourTimeSliceFinished; +        return this;
+    }
+
+    public Compute1HourData getCompute1HourData() {
+        return compute1HourData;
+    }
+
+    public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
+        this.compute1HourData = compute1HourData;
+        return this;
+    }
+
+    public Compute6HourData getCompute6HourData() {
+        return compute6HourData;
+    }
+
+    public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
+        this.compute6HourData = compute6HourData;
+        return this;
+    }
+
+    public Compute24HourData getCompute24HourData() {
+        return compute24HourData;
+    }
+
+    public AggregationState setCompute24HourData(Compute24HourData \
compute24HourData) { +        this.compute24HourData = compute24HourData;
+        return this;
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
 new file mode 100644
index 0000000..bf0bc1a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
 @@ -0,0 +1,378 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+import org.joda.time.Duration;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.DateTimeService;
+import org.rhq.server.metrics.MetricsConfiguration;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * This class provides the main interface for metric data aggregation.
+ *
+ * @author John Sanda
+ */
+public class Aggregator {
+
+    private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = \
new Comparator<AggregateNumericMetric>() { +        @Override
+        public int compare(AggregateNumericMetric left, AggregateNumericMetric \
right) { +            return (left.getScheduleId() < right.getScheduleId()) ? -1 : \
((left.getScheduleId() == right.getScheduleId()) ? 0 : 1); +        }
+    };
+
+    private final Log log = LogFactory.getLog(Aggregator.class);
+
+    private MetricsDAO dao;
+
+    private MetricsConfiguration configuration;
+
+    private DateTimeService dtService;
+
+    private DateTime startTime;
+
+    /**
+     * Signals when raw data index entries (in metrics_index) can be deleted. We \
cannot delete the row in metrics_index +     * until we know that it has been read, \
successfully or otherwise. +     */
+    private SignalingCountDownLatch rawDataIndexEntriesArrival;
+
+    private RateLimiter readPermits;
+    private RateLimiter writePermits;
+
+    private int batchSize;
+
+    private AggregationState state;
+
+    private Set<AggregateNumericMetric> oneHourData;
+
+    private AtomicInteger remainingIndexEntries;
+
+    public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, \
MetricsConfiguration configuration, +        DateTimeService dtService, DateTime \
startTime, int batchSize, RateLimiter writePermits, +        RateLimiter readPermits) \
{ +        this.dao = dao;
+        this.configuration = configuration;
+        this.dtService = dtService;
+        this.startTime = startTime;
+        this.readPermits = readPermits;
+        this.writePermits = writePermits;
+        this.batchSize = batchSize;
+        oneHourData = new \
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR); +        \
rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1)); +    \
remainingIndexEntries = new AtomicInteger(1); +
+        DateTime sixHourTimeSlice = get6HourTimeSlice();
+        DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
+
+        state = new AggregationState()
+            .setAggregationTasks(aggregationTasks)
+            .setOneHourTimeSlice(startTime)
+            .setSixHourTimeSlice(sixHourTimeSlice)
+            .setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
 +            .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
+            .setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
 +            .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice, \
writePermits, dao, oneHourData)) +            .setCompute6HourData(new \
Compute6HourData(sixHourTimeSlice, twentyFourHourTimeSlice, writePermits, dao)) +     \
.setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice, writePermits, \
dao)) +            .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice, \
configuration.getOneHourTimeSliceDuration())) +            \
.set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice, +              \
configuration.getSixHourTimeSliceDuration())) +            .setRemainingRawData(new \
AtomicInteger(0)) +            .setRemaining1HourData(new AtomicInteger(0))
+            .setRemaining6HourData(new AtomicInteger(0))
+            .setOneHourIndexEntries(new TreeSet<Integer>())
+            .setSixHourIndexEntries(new TreeSet<Integer>())
+            .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
+            .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
+
+        if (state.is6HourTimeSliceFinished()) {
+            state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1))); +            remainingIndexEntries.incrementAndGet();
+        } else {
+            state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0))); +            state.setRemaining1HourData(new AtomicInteger(0));
+        }
+
+        if (state.is24HourTimeSliceFinished()) {
+            state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1))); +            remainingIndexEntries.incrementAndGet();
+        } else {
+            state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0))); +            state.setRemaining6HourData(new AtomicInteger(0));
+        }
+    }
+
+    private DateTime get24HourTimeSlice() {
+        return dtService.getTimeSlice(startTime, \
configuration.getSixHourTimeSliceDuration()); +    }
+
+    private DateTime get6HourTimeSlice() {
+        return dtService.getTimeSlice(startTime, \
configuration.getOneHourTimeSliceDuration()); +    }
+
+    private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
+        DateTime endTime = startTime.plus(duration);
+        return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= \
0; +    }
+
+    protected DateTime currentHour() {
+        return dtService.getTimeSlice(dtService.now(), \
configuration.getRawTimeSliceDuration()); +    }
+
+    public Set<AggregateNumericMetric> run() {
+        log.info("Starting aggregation for time slice " + startTime);
+        readPermits.acquire();
+        StorageResultSetFuture rawFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR, +            \
startTime.getMillis()); +        Futures.addCallback(rawFuture, new \
FutureCallback<ResultSet>() { +            @Override
+            public void onSuccess(ResultSet result) {
+                List<Row> rows = result.all();
+                state.getRemainingRawData().set(rows.size());
+                rawDataIndexEntriesArrival.countDown();
+
+                Stopwatch stopwatch = new Stopwatch().start();
+
+                final DateTime endTime = \
startTime.plus(configuration.getRawTimeSliceDuration()); +                \
Set<Integer> scheduleIds = new TreeSet<Integer>(); +                \
List<StorageResultSetFuture> rawDataFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); +                for (final Row row : \
rows) { +                    scheduleIds.add(row.getInt(1));
+                    readPermits.acquire();
+                    rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), \
startTime.getMillis(), +                        endTime.getMillis()));
+                    if (rawDataFutures.size() == batchSize) {
+                        state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds, +                            rawDataFutures));
+                        rawDataFutures = new ArrayList<StorageResultSetFuture>();
+                        scheduleIds = new TreeSet<Integer>();
+                    }
+                }
+                if (!rawDataFutures.isEmpty()) {
+                    state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds, +                        rawDataFutures));
+                }
+
+                if (log.isDebugEnabled()) {
+                    stopwatch.stop();
+                    log.debug("Finished scheduling raw data aggregation tasks for " \
+ rows.size() + " schedules in " + +                        \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); +                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Aggregation for time slice [" + startTime + "] cannot \
proceed. There was an " + +                        "unexpected error while retrieving \
raw data index entries.", t); +                } else {
+                    log.warn("Aggregation for time slice [" + startTime + "] cannot \
proceed. There was an " + +                        "unexpected error while retrieving \
raw data index entries: " + ThrowableUtil.getRootMessage(t)); +                }
+                state.setRemainingRawData(new AtomicInteger(0));
+                rawDataIndexEntriesArrival.abort();
+                deleteIndexEntries(MetricsTable.ONE_HOUR);
+            }
+        }, state.getAggregationTasks());
+
+        if (state.is6HourTimeSliceFinished()) {
+            log.debug("Fetching 1 hour index entries");
+            Stopwatch stopwatch = new Stopwatch().start();
+            StorageResultSetFuture oneHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR, +                \
state.getSixHourTimeSlice().getMillis()); +            \
Futures.addCallback(oneHourFuture, new \
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(), +                \
state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(), stopwatch, "1 \
hour", "6 hour"), +                state.getAggregationTasks());
+        }
+
+        if (state.is24HourTimeSliceFinished()) {
+            log.debug("Fetching 6 hour index entries");
+            Stopwatch stopwatch = new Stopwatch().start();
+            StorageResultSetFuture sixHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR, +                \
state.getTwentyFourHourTimeSlice().getMillis()); +            \
Futures.addCallback(sixHourFuture, new \
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(), +                \
state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(), stopwatch, "6 \
hour", "24 hour"), +                state.getAggregationTasks());
+        }
+
+        try {
+            try {
+                rawDataIndexEntriesArrival.await();
+                deleteIndexEntries(MetricsTable.ONE_HOUR);
+            } catch (AbortedException e) {
+            }
+
+            if (state.is6HourTimeSliceFinished()) {
+                waitFor(state.getRemainingRawData());
+                try {
+                    state.getOneHourIndexEntriesArrival().await();
+                    deleteIndexEntries(MetricsTable.SIX_HOUR);
+
+                    List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); +                    Set<Integer> \
scheduleIds = new TreeSet<Integer>(); +                    \
state.getOneHourIndexEntriesLock().writeLock().lock(); +                    \
log.debug("Preparing to submit 1 hour data aggregation tasks for " + +                \
state.getOneHourIndexEntries().size() + " schedules"); +                    for \
(Integer scheduleId : state.getOneHourIndexEntries()) { +                        \
queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, \
state.getSixHourTimeSlice().getMillis(), +                            \
state.getSixHourTimeSliceEnd().getMillis())); +                        \
scheduleIds.add(scheduleId); +                        if (queryFutures.size() == \
batchSize) { +                            state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds, +                                \
queryFutures)); +                            queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); +                            \
scheduleIds = new TreeSet<Integer>(); +                        }
+                    }
+                    if (!queryFutures.isEmpty()) {
+                        state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds, +                            \
queryFutures)); +                        queryFutures = null;
+                        scheduleIds = null;
+                    }
+                } catch (AbortedException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Some 6 hour aggregates may not get generated. \
There was an unexpected error while " + +                            "loading 1 hour \
index entries", e); +                    } else {
+                        log.warn("Some 6 hour aggregates may not get generated. \
There was an unexpected error while " + +                            "loading 1 hour \
index entries: " + ThrowableUtil.getRootMessage(e)); +                    }
+                } finally {
+                    state.getOneHourIndexEntriesLock().writeLock().unlock();
+                }
+            }
+
+            if (state.is24HourTimeSliceFinished()) {
+                waitFor(state.getRemaining1HourData());
+                try {
+                    state.getSixHourIndexEntriesArrival().await();
+                    deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
+
+                    List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); +                    Set<Integer> \
scheduleIds = new TreeSet<Integer>(); +                    \
state.getSixHourIndexEntriesLock().writeLock().lock(); +                    \
log.debug("Preparing to submit 6 hour data aggregation tasks for " + +                \
state.getSixHourIndexEntries().size() + " schedules"); +                    for \
(Integer scheduleId : state.getSixHourIndexEntries()) { +                        \
queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(), +                            \
state.getTwentyFourHourTimeSliceEnd().getMillis())); +                        \
scheduleIds.add(scheduleId); +                        if (queryFutures.size() == \
batchSize) { +                            state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds, +                                \
queryFutures)); +                            queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); +                            \
scheduleIds = new TreeSet<Integer>(); +                        }
+                    }
+                    if (!queryFutures.isEmpty()) {
+                        state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds, +                            \
queryFutures)); +                        queryFutures = null;
+                        scheduleIds = null;
+                    }
+                } catch (AbortedException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Some 24 hour aggregates may not get generated. \
There was an unexpected error while " + +                            "loading 6 hour \
index entries", e); +                    } else {
+                        log.warn("Some 24 hour aggregates may not get generated. \
There was an unexpected error while " + +                            "loading 6 hour \
index entries: " + ThrowableUtil.getRootMessage(e)); +                    }
+                } finally {
+                    state.getSixHourIndexEntriesLock().writeLock().unlock();
+                }
+            }
+
+            while (!isAggregationFinished()) {
+                Thread.sleep(50);
+            }
+        } catch (InterruptedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("An interrupt occurred while waiting for aggregation to \
finish. Aborting remaining work.", e); +            } else {
+                log.warn("An interrupt occurred while waiting for aggregation to \
finish. Aborting remaining work: " + +                    \
ThrowableUtil.getRootMessage(e)); +            }
+            log.warn("An interrupt occurred while waiting for aggregation to \
finish", e); +        }
+        return oneHourData;
+    }
+
+    private void waitFor(AtomicInteger remainingData) throws InterruptedException {
+        while (remainingData.get() > 0) {
+            Thread.sleep(50);
+        }
+    }
+
+    private boolean isAggregationFinished() {
+        return state.getRemainingRawData().get() <= 0 && \
state.getRemaining1HourData().get() <= 0 && +            \
state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() <= 0; +    }
+
+    private void deleteIndexEntries(final MetricsTable table) {
+        final DateTime time;
+        switch (table) {
+        case ONE_HOUR:
+            time = startTime;
+            break;
+        case SIX_HOUR:
+            time = state.getSixHourTimeSlice();
+            break;
+        default:
+            time = state.getTwentyFourHourTimeSlice();
+            break;
+        }
+        log.debug("Deleting " + table + " index entries for time slice " + time);
+        writePermits.acquire();
+        StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, \
time.getMillis()); +        Futures.addCallback(future, new \
FutureCallback<ResultSet>() { +            @Override
+            public void onSuccess(ResultSet result) {
+                remainingIndexEntries.decrementAndGet();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed to delete index entries for table " + table + \
" at time [" + time + "]. An " + +                        "unexpected error \
occurred.", t); +                } else {
+                    log.warn("Failed to delete index entries for table " + table + " \
at time [" + time + "]. An " + +                        "unexpected error occurred: " \
+ ThrowableUtil.getRootMessage(t)); +                }
+                remainingIndexEntries.decrementAndGet();
+            }
+        });
+    }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
 new file mode 100644
index 0000000..f130f75
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
 @@ -0,0 +1,113 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 1 hour data for a batch of raw data result sets. The generated 1 hour \
aggregates are inserted along with + * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute1HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+    private final Log log = LogFactory.getLog(Compute1HourData.class);
+
+    private DateTime startTime;
+
+    private RateLimiter writePermits;
+
+    private MetricsDAO dao;
+
+    private DateTime sixHourTimeSlice;
+
+    private Set<AggregateNumericMetric> oneHourData;
+
+    public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, \
RateLimiter writePermits, MetricsDAO dao, +        Set<AggregateNumericMetric> \
oneHourData) { +        this.startTime = startTime;
+        this.sixHourTimeSlice = sixHourTimeSlice;
+        this.writePermits = writePermits;
+        this.dao = dao;
+        this.oneHourData = oneHourData;
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
rawDataResultSets) throws Exception { +        if (log.isDebugEnabled()) {
+            log.debug("Computing and storing 1 hour data for " + \
rawDataResultSets.size() + " schedules"); +        }
+        Stopwatch stopwatch = new Stopwatch().start();
+        try {
+            List<StorageResultSetFuture> insertFutures = new \
ArrayList<StorageResultSetFuture>(rawDataResultSets.size()); +            for \
(ResultSet resultSet : rawDataResultSets) { +                AggregateNumericMetric \
aggregate = calculateAggregatedRaw(resultSet); +                \
oneHourData.add(aggregate); +                writePermits.acquire(4);
+                insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MIN, \
aggregate.getMin())); +                \
insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MAX, \
aggregate.getMax())); +                \
insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.AVG, \
aggregate.getAvg())); +                \
insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR, \
aggregate.getScheduleId(), +                    sixHourTimeSlice.getMillis()));
+            }
+            return Futures.successfulAsList(insertFutures);
+        } finally {
+            if (log.isDebugEnabled()) {
+                stopwatch.stop();
+                log.debug("Finished computing and storing 1 hour data for " + \
rawDataResultSets.size() + +                    " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); +            }
+        }
+    }
+
+    private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
+        double min = Double.NaN;
+        double max = min;
+        int count = 0;
+        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+        double value;
+        List<Row> rows = resultSet.all();
+
+        for (Row row : rows) {
+            value = row.getDouble(2);
+            if (count == 0) {
+                min = value;
+                max = min;
+            }
+            if (value < min) {
+                min = value;
+            } else if (value > max) {
+                max = value;
+            }
+            mean.add(value);
+            ++count;
+        }
+
+        return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, +            startTime.getMillis());
+    }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
 new file mode 100644
index 0000000..6fe9d79
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
 @@ -0,0 +1,99 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+
+/**
+ * Computes 24 hour data for a batch of raw data result sets. The generated 6 hour \
aggregates are inserted. + *
+ * @author John Sanda
+ */
+class Compute24HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+    private final Log log = LogFactory.getLog(Compute24HourData.class);
+
+    private DateTime startTime;
+
+    private RateLimiter writePermits;
+
+    private MetricsDAO dao;
+
+    public Compute24HourData(DateTime startTime, RateLimiter writePermits, \
MetricsDAO dao) { +        this.startTime = startTime;
+        this.writePermits = writePermits;
+        this.dao = dao;
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
sixHourDataResultSets) throws Exception { +        if (log.isDebugEnabled()) {
+            log.debug("Computing and storing 24 hour data for " + \
sixHourDataResultSets.size() + " schedules"); +        }
+        Stopwatch stopwatch = new Stopwatch().start();
+        try {
+            List<StorageResultSetFuture> insertFutures =
+                new ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
+            for (ResultSet resultSet : sixHourDataResultSets) {
+                AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+                writePermits.acquire(3);
+                insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MIN, \
aggregate.getMin())); +                \
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MAX, \
aggregate.getMax())); +                \
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.AVG, \
aggregate.getAvg())); +            }
+            return Futures.successfulAsList(insertFutures);
+        } finally {
+            if (log.isDebugEnabled()) {
+                stopwatch.stop();
+                log.debug("Finished computing and storing 24 hour data for " + \
sixHourDataResultSets.size() + +                    " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); +            }
+        }
+    }
+
+    private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+        double min = Double.NaN;
+        double max = min;
+        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+        List<Row> rows = resultSet.all();
+
+        for (int i = 0; i < rows.size(); i += 3) {
+            if (i == 0) {
+                min = rows.get(i + 1).getDouble(3);
+                max = rows.get(i).getDouble(3);
+            } else {
+                if (rows.get(i + 1).getDouble(3) < min) {
+                    min = rows.get(i + 1).getDouble(3);
+                }
+                if (rows.get(i).getDouble(3) > max) {
+                    max = rows.get(i).getDouble(3);
+                }
+            }
+            mean.add(rows.get(i + 2).getDouble(3));
+        }
+        return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, +            startTime.getMillis());
+    }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
 new file mode 100644
index 0000000..ec1ee26
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
 @@ -0,0 +1,106 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 6 hour data for a batch of raw data result sets. The generated 6 hour \
aggregates are inserted along with + * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute6HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+    private final Log log = LogFactory.getLog(Compute6HourData.class);
+
+    private DateTime startTime;
+
+    private RateLimiter writePermits;
+
+    private MetricsDAO dao;
+
+    private DateTime twentyFourHourTimeSlice;
+
+    public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice, \
RateLimiter writePermits, +        MetricsDAO dao) {
+        this.startTime = startTime;
+        this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+        this.writePermits = writePermits;
+        this.dao = dao;
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
oneHourDataResultSets) throws Exception { +        if (log.isDebugEnabled()) {
+            log.debug("Computing and storing 6 hour data for " + \
oneHourDataResultSets.size() + " schedules"); +        }
+        Stopwatch stopwatch = new Stopwatch().start();
+        try {
+            List<StorageResultSetFuture> insertFutures =
+                new ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
+            for (ResultSet resultSet : oneHourDataResultSets) {
+                AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+                writePermits.acquire(4);
+                insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MIN, \
aggregate.getMin())); +                \
insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.MAX, \
aggregate.getMax())); +                \
insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), +                    AggregateType.AVG, \
aggregate.getAvg())); +                \
insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR, \
aggregate.getScheduleId(), +                    \
twentyFourHourTimeSlice.getMillis())); +            }
+            return Futures.successfulAsList(insertFutures);
+        } finally {
+            if (log.isDebugEnabled()) {
+                stopwatch.stop();
+                log.debug("Finished computing and storing 6 hour data for " + \
oneHourDataResultSets.size() + +                    " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); +            }
+        }
+    }
+
+    private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+        double min = Double.NaN;
+        double max = min;
+        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+        List<Row> rows = resultSet.all();
+
+        for (int i = 0; i < rows.size(); i += 3) {
+            if (i == 0) {
+                min = rows.get(i + 1).getDouble(3);
+                max = rows.get(i).getDouble(3);
+            } else {
+                if (rows.get(i + 1).getDouble(3) < min) {
+                    min = rows.get(i + 1).getDouble(3);
+                }
+                if (rows.get(i).getDouble(3) > max) {
+                    max = rows.get(i).getDouble(3);
+                }
+            }
+            mean.add(rows.get(i + 2).getDouble(3));
+        }
+        return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, +            startTime.getMillis());
+    }
+}


_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic