[prev in list] [next in list] [prev in thread] [next in thread] 

List:       rhq-commits
Subject:    [rhq] modules/common
From:       snegrea <snegrea () fedoraproject ! org>
Date:       2013-05-31 22:31:15
Message-ID: 20130531223115.792E760FCF () fedorahosted ! org
[Download RAW message or body]

 modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java \
|  251 +++++++  modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java \
|  319 ----------  modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java \
|  130 ++++  modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java \
|  149 ++++  modules/common/cassandra-schema/src/main/resources/topology/0001.xml     \
|   13   modules/common/cassandra-schema/src/main/resources/topology/0002.xml         \
|   38 +  6 files changed, 596 insertions(+), 304 deletions(-)

New commits:
commit 245357d903f998c35f093387d7ba7824ade52968
Author: Stefan Negrea <snegrea@redhat.com>
Date:   Fri May 31 17:30:46 2013 -0500

    Update topology management to schema manager. No change to the external interface \
for the schema manager.  
    The topology update functionality includes updating the replication factor and gc \
grace period.

diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
 new file mode 100644
index 0000000..be69c08
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
 @@ -0,0 +1,251 @@
+/*
+ *
+ *  * RHQ Management Platform
+ *  * Copyright (C) 2005-2012 Red Hat, Inc.
+ *  * All rights reserved.
+ *  *
+ *  * This program is free software; you can redistribute it and/or modify
+ *  * it under the terms of the GNU General Public License, version 2, as
+ *  * published by the Free Software Foundation, and/or the GNU Lesser
+ *  * General Public License, version 2.1, also as published by the Free
+ *  * Software Foundation.
+ *  *
+ *  * This program is distributed in the hope that it will be useful,
+ *  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ *  * GNU General Public License and the GNU Lesser General Public License
+ *  * for more details.
+ *  *
+ *  * You should have received a copy of the GNU General Public License
+ *  * and the GNU Lesser General Public License along with this program;
+ *  * if not, write to the Free Software Foundation, Inc.,
+ *  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import org.rhq.cassandra.util.ClusterBuilder;
+import org.rhq.core.domain.cloud.StorageNode;
+import org.rhq.core.util.StringUtil;
+
+/**
+ * @author Stefan Negrea
+ */
+public class AbstractManager {
+
+    private static final String UPDATE_PLAN_ELEMENT = "updatePlan";
+    private static final String STEP_ELEMENT = "step";
+
+    private static final String SCHEMA_EXISTS_QUERY = "SELECT * FROM \
system.schema_keyspaces WHERE keyspace_name = 'rhq';"; +    private static final \
String VERSION_COLUMNFAMILY_EXISTS_QUERY = "SELECT * from \
system.schema_columnfamilies WHERE keyspace_name='rhq' AND \
columnfamily_name='schema_version';"; +    private static final String VERSION_QUERY \
= "SELECT version FROM rhq.schema_version"; +
+
+    private final Log log = LogFactory.getLog(AbstractManager.class);
+
+    protected Session session;
+    protected final String username;
+    protected final String password;
+    protected List<StorageNode> nodes = new ArrayList<StorageNode>();
+
+    public AbstractManager(String username, String password, List<StorageNode> \
nodes) { +        try {
+            this.username = username;
+            this.password = password;
+            this.nodes = nodes;
+
+            this.initCluster();
+            this.shutdown();
+        } catch (NoHostAvailableException e) {
+            throw new RuntimeException("Unable create storage node session.", e);
+        }
+    }
+
+    protected boolean schemaExists() {
+        try {
+            ResultSet resultSet = session.execute(SCHEMA_EXISTS_QUERY);
+            if (!resultSet.all().isEmpty()) {
+                resultSet = session.execute(VERSION_COLUMNFAMILY_EXISTS_QUERY);
+                return !resultSet.all().isEmpty();
+            }
+            return false;
+        } catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected int getSchemaVersion() {
+        int maxVersion = 0;
+        try {
+            ResultSet resultSet = session.execute(VERSION_QUERY);
+            for (Row row : resultSet.all()) {
+                if (maxVersion < row.getInt(0)) {
+                    maxVersion = row.getInt(0);
+                }
+            }
+        } catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(e);
+        }
+
+        return maxVersion;
+    }
+
+    protected void removeAppliedUpdates(List<String> updateFiles, int \
currentSchemaVersion) { +        while (!updateFiles.isEmpty()) {
+            int version = this.extractVersionFromUpdateFile(updateFiles.get(0));
+            if (version <= currentSchemaVersion) {
+                updateFiles.remove(0);
+            } else {
+                break;
+            }
+        }
+    }
+
+    protected int extractVersionFromUpdateFile(String file) {
+        file = file.substring(file.lastIndexOf('/') + 1);
+        file = file.substring(0, file.indexOf('.'));
+        return Integer.parseInt(file);
+    }
+
+    protected List<String> getSteps(String file) throws Exception {
+        List<String> steps = new ArrayList<String>();
+        InputStream stream = null;
+        try {
+            stream = SchemaManager.class.getClassLoader().getResourceAsStream(file);
+
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder = factory.newDocumentBuilder();
+            Document doc = builder.parse(stream);
+
+            Node rootDocument = \
doc.getElementsByTagName(UPDATE_PLAN_ELEMENT).item(0); +            NodeList \
updateStepElements = rootDocument.getChildNodes(); +
+            for (int index = 0; index < updateStepElements.getLength(); index++) {
+                Node updateStepElement = updateStepElements.item(index);
+                if (STEP_ELEMENT.equals(updateStepElement.getNodeName()) && \
updateStepElement.getTextContent() != null) { +                    \
steps.add(updateStepElement.getTextContent()); +                }
+            }
+        } catch (Exception e) {
+            log.error("Error reading the list of steps from " + file + " file.", e);
+            throw e;
+        } finally {
+            if (stream != null) {
+                try {
+                    stream.close();
+                } catch (Exception e) {
+                    log.error("Error closing the stream with the list of steps from \
" + file + " file.", e); +                    throw e;
+                }
+            }
+        }
+
+        return steps;
+    }
+
+    protected List<String> getUpdateFiles(String folder) throws Exception {
+        List<String> files = new ArrayList<String>();
+        InputStream stream = null;
+
+        try {
+            URL resourceFolderURL = \
this.getClass().getClassLoader().getResource(folder); +
+            if (resourceFolderURL.getProtocol().equals("file")) {
+                stream = this.getClass().getResourceAsStream(folder);
+                BufferedReader reader = new BufferedReader(new \
InputStreamReader(stream)); +
+                String updateFile;
+                while ((updateFile = reader.readLine()) != null) {
+                    files.add(folder + updateFile);
+                }
+            } else if (resourceFolderURL.getProtocol().equals("jar")) {
+                URL jarURL = \
this.getClass().getClassLoader().getResources(folder).nextElement(); +                \
JarURLConnection jarURLCon = (JarURLConnection) (jarURL.openConnection()); +          \
JarFile jarFile = jarURLCon.getJarFile(); +                Enumeration<JarEntry> \
entries = jarFile.entries(); +                while (entries.hasMoreElements()) {
+                    String entry = entries.nextElement().getName();
+                    if (entry.startsWith(folder) && !entry.equals(folder)) {
+                        files.add(entry);
+                    }
+                }
+            }
+
+            Collections.sort(files, new Comparator<String>() {
+                @Override
+                public int compare(String o1, String o2) {
+                    return o1.compareTo(o2);
+                }
+            });
+        } catch (Exception e) {
+            log.error("Error reading the list of update files.", e);
+            throw e;
+        } finally {
+            if (stream != null) {
+                try{
+                    stream.close();
+                } catch (Exception e) {
+                    log.error("Error closing the stream with the list of update \
files.", e); +                    throw e;
+                }
+            }
+        }
+
+        return files;
+    }
+
+
+    protected void initCluster() throws NoHostAvailableException {
+        String[] hostNames = new String[nodes.size()];
+        for (int i = 0; i < hostNames.length; ++i) {
+            hostNames[i] = nodes.get(i).getAddress();
+        }
+
+        log.info("Initializing session to connect to " + \
StringUtil.arrayToString(hostNames)); +
+        Cluster cluster = new \
ClusterBuilder().addContactPoints(hostNames).withCredentials(username, password) +    \
.withPort(nodes.get(0).getCqlPort()).withCompression(Compression.NONE).build(); +
+        log.info("Cluster connection configured.");
+
+        session = cluster.connect("system");
+        log.info("Cluster connected.");
+    }
+
+    protected void shutdown() {
+        log.info("Shutting down connections");
+        session.getCluster().shutdown();
+    }
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
 index a43d06c..923065c 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
                
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
 @@ -25,32 +25,9 @@
 
 package org.rhq.cassandra.schema;
 
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.JarURLConnection;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Enumeration;
 import java.util.List;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,284 +35,43 @@ import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
-import org.w3c.dom.Document;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
 
-import org.rhq.cassandra.util.ClusterBuilder;
 import org.rhq.core.domain.cloud.StorageNode;
-import org.rhq.core.util.StringUtil;
 
 /**
  * @author John Sanda
  */
 public class SchemaManager {
 
-    private static final String RHQ_KEYSPACE = "rhq";
-    private static final String SCHEMA_BASE_FOLDER = "schema";
-    private static final String UPDATE_PLAN_ELEMENT = "updatePlan";
-    private static final String STEP_ELEMENT = "step";
-
-    private static final String SCHEMA_EXISTS_QUERY = "SELECT * FROM \
                system.schema_keyspaces WHERE keyspace_name = 'rhq';";
-    private static final String VERSION_COLUMNFAMILY_EXISTS_QUERY = "SELECT * from \
system.schema_columnfamilies WHERE keyspace_name='rhq' AND \
                columnfamily_name='schema_version';";
-    private static final String VERSION_QUERY = "SELECT version FROM \
                rhq.schema_version";
-    private static final String INSERT_VERSION_QUERY = "INSERT INTO \
                rhq.schema_version (version, time ) VALUES ( ?, ?);";
-
-
     private final Log log = LogFactory.getLog(SchemaManager.class);
 
-    private enum Task {
-        Drop("drop"),
-        Create("create"),
-        Update("update");
-
-        private final String folder;
-
-        private Task(String folder){
-            this.folder = folder;
-        }
-
-        protected String getFolder() {
-            return "" + SCHEMA_BASE_FOLDER + "/" + this.folder + "/";
-        }
-    }
-
-    private Session session;
-    private String username;
-    private String password;
-
-    private List<StorageNode> nodes = new ArrayList<StorageNode>();
+    private final String username;
+    private final String password;
+    private final List<StorageNode> nodes = new ArrayList<StorageNode>();
 
     public SchemaManager(String username, String password, String... nodes) {
         this(username, password, parseNodeInformation(nodes));
     }
 
     public SchemaManager(String username, String password, List<StorageNode> nodes) \
                {
-        try {
-            this.username = username;
-            this.password = password;
-            this.nodes = nodes;
-
-            this.initCluster();
-            this.shutdown();
-        } catch (NoHostAvailableException e) {
-            throw new RuntimeException("Unable create storage node session.", e);
-        }
+        this.username = username;
+        this.password = password;
+        this.nodes.addAll(nodes);
     }
 
-
     public void install() throws Exception {
-        log.info("Preparing to install schema");
-        try {
-            initCluster();
-
-            if (!schemaExists()) {
-                this.executeTask(Task.Create);
-            } else {
-                log.info("RHQ schema already exists.");
-            }
-
-            this.executeTask(Task.Update);
-        } catch (NoHostAvailableException e) {
-            throw new RuntimeException(e);
-        } finally {
-            shutdown();
-        }
+        VersionManager version = new VersionManager(username, password, nodes);
+        version.install();
     }
 
     public void drop() throws Exception {
-        log.info("Preparing to drop RHQ schema");
-        try {
-            initCluster();
-
-            if (schemaExists()) {
-                this.executeTask(Task.Drop);
-            } else {
-                log.info("RHQ schema does not exist. Drop operation not required.");
-            }
-        } catch (NoHostAvailableException e) {
-            throw new RuntimeException(e);
-        } finally {
-            shutdown();
-        }
-    }
-
-    private boolean schemaExists() {
-        try {
-            ResultSet resultSet = session.execute(SCHEMA_EXISTS_QUERY);
-            if (!resultSet.all().isEmpty()) {
-                resultSet = session.execute(VERSION_COLUMNFAMILY_EXISTS_QUERY);
-                return !resultSet.all().isEmpty();
-            }
-            return false;
-        } catch (Exception e) {
-            log.error(e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private int getSchemaVersion() {
-        int maxVersion = 0;
-        try {
-            ResultSet resultSet = session.execute(VERSION_QUERY);
-            for (Row row : resultSet.all()) {
-                if (maxVersion < row.getInt(0)) {
-                    maxVersion = row.getInt(0);
-                }
-            }
-        } catch (Exception e) {
-            log.error(e);
-            throw new RuntimeException(e);
-        }
-
-        return maxVersion;
-    }
-
-    private void executeTask(Task task) throws Exception {
-        try {
-            log.info("Starting to execute " + task + " task.");
-
-            List<String> updateFiles = this.getUpdateFiles(task);
-
-            if (Task.Update.equals(task)) {
-                int currentSchemaVersion = this.getSchemaVersion();
-                log.info("Current schema version is " + currentSchemaVersion);
-                this.removeAppliedUpdates(updateFiles, currentSchemaVersion);
-            }
-
-            if (updateFiles.size() == 0 && Task.Update.equals(task)) {
-                log.info("RHQ schema is current! No updates applied.");
-            }
-
-            for (String updateFile : updateFiles) {
-                log.info("Applying file " + updateFile + " for " + task + " task.");
-                for (String step : getSteps(updateFile)) {
-                    log.info("Statement: \n" + step);
-                    session.execute(step);
-                }
-
-                if (Task.Update.equals(task)) {
-                    this.updateSchemaVersion(updateFile);
-                }
-
-                log.info("File " + updateFile + " applied for " + task + " task.");
-            }
-        } catch (Exception e) {
-            log.error(e);
-            throw new RuntimeException(e);
-        }
-
-        log.info("Successfully executed " + task + " task.");
-    }
-
-    private void updateSchemaVersion(String test) {
-        PreparedStatement preparedStatement = session.prepare(INSERT_VERSION_QUERY);
-        BoundStatement boundStatement = \
                preparedStatement.bind(this.extractVersionFromUpdateFile(test), new \
                Date());
-        session.execute(boundStatement);
-    }
-
-    private void removeAppliedUpdates(List<String> updateFiles, int \
                currentSchemaVersion) {
-        while (!updateFiles.isEmpty()) {
-            int version = this.extractVersionFromUpdateFile(updateFiles.get(0));
-            if (version <= currentSchemaVersion) {
-                updateFiles.remove(0);
-            } else {
-                break;
-            }
-        }
-    }
-
-    private int extractVersionFromUpdateFile(String file) {
-        file = file.substring(file.lastIndexOf('/') + 1);
-        file = file.substring(0, file.indexOf('.'));
-        return Integer.parseInt(file);
+        VersionManager version = new VersionManager(username, password, nodes);
+        version.drop();
     }
 
-    private List<String> getSteps(String file) throws Exception {
-        List<String> steps = new ArrayList<String>();
-        InputStream stream = null;
-        try {
-            stream = this.getClass().getClassLoader().getResourceAsStream(file);
-
-            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder builder = factory.newDocumentBuilder();
-            Document doc = builder.parse(stream);
-
-            Node rootDocument = \
                doc.getElementsByTagName(UPDATE_PLAN_ELEMENT).item(0);
-            NodeList updateStepElements = rootDocument.getChildNodes();
-
-            for (int index = 0; index < updateStepElements.getLength(); index++) {
-                Node updateStepElement = updateStepElements.item(index);
-                if (STEP_ELEMENT.equals(updateStepElement.getNodeName()) && \
                updateStepElement.getTextContent() != null) {
-                    steps.add(updateStepElement.getTextContent());
-                }
-            }
-        } catch (Exception e) {
-            log.error("Error reading the list of steps from " + file + " file.", e);
-            throw e;
-        } finally {
-            if (stream != null) {
-                try {
-                    stream.close();
-                } catch (Exception e) {
-                    log.error("Error closing the stream with the list of steps from \
                " + file + " file.", e);
-                    throw e;
-                }
-            }
-        }
-
-        return steps;
-    }
-
-    private List<String> getUpdateFiles(Task task) throws Exception {
-        List<String> files = new ArrayList<String>();
-        InputStream stream = null;
-
-        try {
-            URL resourceFolderURL = \
                this.getClass().getClassLoader().getResource(task.getFolder());
-
-            if (resourceFolderURL.getProtocol().equals("file")) {
-                stream = this.getClass().getResourceAsStream(task.getFolder());
-                BufferedReader reader = new BufferedReader(new \
                InputStreamReader(stream));
-
-                String updateFile;
-                while ((updateFile = reader.readLine()) != null) {
-                    files.add(task.getFolder() + updateFile);
-                }
-            } else if (resourceFolderURL.getProtocol().equals("jar")) {
-                URL jarURL = \
                this.getClass().getClassLoader().getResources(task.getFolder()).nextElement();
                
-                JarURLConnection jarURLCon = (JarURLConnection) \
                (jarURL.openConnection());
-                JarFile jarFile = jarURLCon.getJarFile();
-                Enumeration<JarEntry> entries = jarFile.entries();
-                while (entries.hasMoreElements()) {
-                    String entry = entries.nextElement().getName();
-                    if (entry.startsWith(task.getFolder()) && \
                !entry.equals(task.getFolder())) {
-                        files.add(entry);
-                    }
-                }
-            }
-
-            Collections.sort(files, new Comparator<String>() {
-                @Override
-                public int compare(String o1, String o2) {
-                    return o1.compareTo(o2);
-                }
-            });
-        } catch (Exception e) {
-            log.error("Error reading the list of update files.", e);
-            throw e;
-        } finally {
-            if (stream != null) {
-                try{
-                    stream.close();
-                } catch (Exception e) {
-                    log.error("Error closing the stream with the list of update \
                files.", e);
-                    throw e;
-                }
-            }
-        }
-
-        return files;
+    public void updateTopology() throws Exception {
+        TopologyManager topology = new TopologyManager(username, password, nodes);
+        topology.updateTopology();
     }
 
     private static List<StorageNode> parseNodeInformation(String... nodes) {
@@ -349,28 +85,6 @@ public class SchemaManager {
         return parsedNodes;
     }
 
-    private void initCluster() throws NoHostAvailableException {
-        String[] hostNames = new String[nodes.size()];
-        for (int i = 0; i < hostNames.length; ++i) {
-            hostNames[i] = nodes.get(i).getAddress();
-        }
-
-        log.info("Initializing session to connect to " + \
                StringUtil.arrayToString(hostNames));
-
-        Cluster cluster = new \
                ClusterBuilder().addContactPoints(hostNames).withCredentials(username, \
                password)
-            .withPort(nodes.get(0).getCqlPort()).withCompression(Compression.NONE).build();
                
-
-        log.info("Cluster connection configured.");
-
-        session = cluster.connect("system");
-        log.info("Cluster connected.");
-    }
-
-    private void shutdown() {
-        log.info("Shutting down connections");
-        session.getCluster().shutdown();
-    }
-
     public static void main(String[] args) throws Exception {
         try {
             Logger root = Logger.getRootLogger();
@@ -393,16 +107,15 @@ public class SchemaManager {
             String username = args[1];
             String password = args[2];
 
-            System.out.println(args[3]);
-
             SchemaManager schemaManager = new SchemaManager(username, password,
                 Arrays.copyOfRange(args, 3, args.length));
 
-            System.out.println(command);
             if ("install".equalsIgnoreCase(command)) {
                 schemaManager.install();
             } else if ("drop".equalsIgnoreCase(command)) {
                 schemaManager.drop();
+            } else if ("topology".equalsIgnoreCase(command)) {
+                schemaManager.updateTopology();
             } else {
                 throw new IllegalArgumentException(command + " not available.");
             }
@@ -411,7 +124,5 @@ public class SchemaManager {
         } finally {
             System.exit(0);
         }
-
     }
-
 }
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
 new file mode 100644
index 0000000..87e2a18
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
 @@ -0,0 +1,130 @@
+/*
+ *
+ *  * RHQ Management Platform
+ *  * Copyright (C) 2005-2012 Red Hat, Inc.
+ *  * All rights reserved.
+ *  *
+ *  * This program is free software; you can redistribute it and/or modify
+ *  * it under the terms of the GNU General Public License, version 2, as
+ *  * published by the Free Software Foundation, and/or the GNU Lesser
+ *  * General Public License, version 2.1, also as published by the Free
+ *  * Software Foundation.
+ *  *
+ *  * This program is distributed in the hope that it will be useful,
+ *  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ *  * GNU General Public License and the GNU Lesser General Public License
+ *  * for more details.
+ *  *
+ *  * You should have received a copy of the GNU General Public License
+ *  * and the GNU Lesser General Public License along with this program;
+ *  * if not, write to the Free Software Foundation, Inc.,
+ *  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.util.List;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.domain.cloud.StorageNode;
+
+/**
+ * @author Stefan Negrea
+ */
+public class TopologyManager extends AbstractManager {
+
+    private final Log log = LogFactory.getLog(TopologyManager.class);
+
+    private static final String TOPOLOGY_BASE_FOLDER = "topology";
+
+
+    private enum Task {
+        UpdateReplicationFactor("0001.xml"),
+        UpdateGCGrace("0002.xml");
+
+        private final String file;
+
+        private Task(String file) {
+            this.file = file;
+        }
+
+        protected String getFile() {
+            return TOPOLOGY_BASE_FOLDER + "/" + this.file;
+        }
+    }
+
+    public TopologyManager(String username, String password, List<StorageNode> \
nodes) { +        super(username, password, nodes);
+    }
+
+    public void updateTopology() throws Exception {
+        initCluster();
+        if (schemaExists()) {
+            log.info("Applying topology updates...");
+            this.updateReplicationFactor(nodes.size());
+            this.updateGCGrace(nodes.size());
+        } else {
+            log.info("Topology updates cannot be applied because the schema is not \
installed."); +        }
+        shutdown();
+    }
+
+    private boolean updateReplicationFactor(int numberOfNodes) throws Exception {
+        log.info("Starting to execute " + Task.UpdateReplicationFactor + " task.");
+
+        int replicationFactor = 1;
+        if (numberOfNodes < 4) {
+            replicationFactor = numberOfNodes;
+        } else {
+            replicationFactor = 3;
+        }
+
+        log.info("Applying file " + Task.UpdateReplicationFactor.getFile() + " for " \
+ Task.UpdateReplicationFactor +            + " task.");
+        for (String query : this.getSteps(Task.UpdateReplicationFactor.getFile())) {
+            executedPreparedStatement(query, replicationFactor);
+        }
+        log.info("File " + Task.UpdateReplicationFactor.getFile() + " applied for " \
+ Task.UpdateReplicationFactor +            + " task.");
+
+        log.info("Successfully executed " + Task.UpdateReplicationFactor + " \
task."); +        return true;
+    }
+
+    private boolean updateGCGrace(int numberOfNodes) throws Exception {
+        log.info("Starting to execute " + Task.UpdateGCGrace + " task.");
+
+        int gcGraceSeconds = 864000;
+        if (numberOfNodes == 1) {
+            gcGraceSeconds = 0;
+        } else {
+            gcGraceSeconds = 691200; // 8 days
+        }
+
+
+        log.info("Applying file " + Task.UpdateGCGrace.getFile() + " for " + \
Task.UpdateGCGrace + " task."); +        for (String query : \
this.getSteps(Task.UpdateGCGrace.getFile())) { +            \
executedPreparedStatement(query, gcGraceSeconds); +        }
+        log.info("File " + Task.UpdateGCGrace.getFile() + " applied for " + \
Task.UpdateGCGrace + " task."); +
+        log.info("Successfully executed " + Task.UpdateGCGrace + " task.");
+        return true;
+    }
+
+    private void executedPreparedStatement(String query, Object... values) {
+        String formattedQuery = String.format(query, values);
+        log.info("Statement: \n" + formattedQuery);
+        PreparedStatement preparedStatement = session.prepare(formattedQuery);
+        BoundStatement boundStatement = preparedStatement.bind();
+        session.execute(boundStatement);
+    }
+
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
 new file mode 100644
index 0000000..437e097
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
 @@ -0,0 +1,149 @@
+/*
+ *
+ *  * RHQ Management Platform
+ *  * Copyright (C) 2005-2012 Red Hat, Inc.
+ *  * All rights reserved.
+ *  *
+ *  * This program is free software; you can redistribute it and/or modify
+ *  * it under the terms of the GNU General Public License, version 2, as
+ *  * published by the Free Software Foundation, and/or the GNU Lesser
+ *  * General Public License, version 2.1, also as published by the Free
+ *  * Software Foundation.
+ *  *
+ *  * This program is distributed in the hope that it will be useful,
+ *  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ *  * GNU General Public License and the GNU Lesser General Public License
+ *  * for more details.
+ *  *
+ *  * You should have received a copy of the GNU General Public License
+ *  * and the GNU Lesser General Public License along with this program;
+ *  * if not, write to the Free Software Foundation, Inc.,
+ *  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.util.Date;
+import java.util.List;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.domain.cloud.StorageNode;
+
+/**
+ * @author Stefan Negrea
+ */
+public class VersionManager extends AbstractManager {
+
+    private static final String SCHEMA_BASE_FOLDER = "schema";
+    private static final String INSERT_VERSION_QUERY = "INSERT INTO \
rhq.schema_version (version, time ) VALUES ( ?, ?);"; +
+    private final Log log = LogFactory.getLog(VersionManager.class);
+
+    private enum Task {
+        Drop("drop"),
+        Create("create"),
+        Update("update");
+
+        private final String folder;
+
+        private Task(String folder){
+            this.folder = folder;
+        }
+
+        protected String getFolder() {
+            return SCHEMA_BASE_FOLDER + "/" + this.folder + "/";
+        }
+    }
+
+    public VersionManager(String username, String password, List<StorageNode> nodes) \
{ +        super(username, password, nodes);
+    }
+
+    public void install() throws Exception {
+        log.info("Preparing to install schema");
+        try {
+            initCluster();
+
+            if (!schemaExists()) {
+                this.executeTask(Task.Create);
+            } else {
+                log.info("RHQ schema already exists.");
+            }
+
+            this.executeTask(Task.Update);
+        } catch (NoHostAvailableException e) {
+            throw new RuntimeException(e);
+        } finally {
+            shutdown();
+        }
+    }
+
+    public void drop() throws Exception {
+        log.info("Preparing to drop RHQ schema");
+        try {
+            initCluster();
+
+            if (schemaExists()) {
+                this.executeTask(Task.Drop);
+            } else {
+                log.info("RHQ schema does not exist. Drop operation not required.");
+            }
+        } catch (NoHostAvailableException e) {
+            throw new RuntimeException(e);
+        } finally {
+            shutdown();
+        }
+    }
+
+    private void executeTask(Task task) throws Exception {
+        try {
+            log.info("Starting to execute " + task + " task.");
+
+            List<String> updateFiles = this.getUpdateFiles(task.getFolder());
+
+            if (Task.Update.equals(task)) {
+                int currentSchemaVersion = this.getSchemaVersion();
+                log.info("Current schema version is " + currentSchemaVersion);
+                this.removeAppliedUpdates(updateFiles, currentSchemaVersion);
+            }
+
+            if (updateFiles.size() == 0 && Task.Update.equals(task)) {
+                log.info("RHQ schema is current! No updates applied.");
+            }
+
+            for (String updateFile : updateFiles) {
+                log.info("Applying file " + updateFile + " for " + task + " task.");
+                for (String step : getSteps(updateFile)) {
+                    log.info("Statement: \n" + step);
+                    session.execute(step);
+                }
+
+                if (Task.Update.equals(task)) {
+                    this.updateSchemaVersion(updateFile);
+                }
+
+                log.info("File " + updateFile + " applied for " + task + " task.");
+            }
+        } catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(e);
+        }
+
+        log.info("Successfully executed " + task + " task.");
+    }
+
+    private void updateSchemaVersion(String updateFileName) {
+        PreparedStatement preparedStatement = session.prepare(INSERT_VERSION_QUERY);
+        BoundStatement boundStatement = \
preparedStatement.bind(this.extractVersionFromUpdateFile(updateFileName), +           \
new Date()); +        session.execute(boundStatement);
+    }
+}
diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0001.xml \
b/modules/common/cassandra-schema/src/main/resources/topology/0001.xml new file mode \
100644 index 0000000..f4382dc
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/resources/topology/0001.xml
@@ -0,0 +1,13 @@
+<updatePlan>
+  <step>
+    UPDATE system.schema_keyspaces SET
+    strategy_options='{"replication_factor":"%s"}'
+    WHERE keyspace_name='rhq'
+  </step>
+
+  <step>
+    UPDATE system.schema_keyspaces SET
+    strategy_options='{"replication_factor":"%s"}'
+    WHERE keyspace_name='system_auth'
+  </step>
+</updatePlan>
\ No newline at end of file
diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0002.xml \
b/modules/common/cassandra-schema/src/main/resources/topology/0002.xml new file mode \
100644 index 0000000..250aec0
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/resources/topology/0002.xml
@@ -0,0 +1,38 @@
+<updatePlan>
+  <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='metrics_index';
+  </step>
+
+  <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='raw_metrics';
+  </step>
+
+  <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='one_hour_metrics';
+  </step>
+
+  <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='six_hour_metrics';
+  </step>
+
+  <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='twenty_four_hour_metrics';
+  </step>
+
+   <step>
+    UPDATE system.schema_columnfamilies
+    SET gc_grace_seconds = %s
+    WHERE keyspace_name='rhq' AND columnfamily_name='schema_version';
+   </step>
+
+</updatePlan>
\ No newline at end of file


_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic