[prev in list] [next in list] [prev in thread] [next in thread]
List: rhq-commits
Subject: [rhq] modules/common
From: snegrea <snegrea () fedoraproject ! org>
Date: 2013-05-31 22:31:15
Message-ID: 20130531223115.792E760FCF () fedorahosted ! org
[Download RAW message or body]
modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java \
| 251 +++++++ modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java \
| 319 ---------- modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java \
| 130 ++++ modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java \
| 149 ++++ modules/common/cassandra-schema/src/main/resources/topology/0001.xml \
| 13 modules/common/cassandra-schema/src/main/resources/topology/0002.xml \
| 38 + 6 files changed, 596 insertions(+), 304 deletions(-)
New commits:
commit 245357d903f998c35f093387d7ba7824ade52968
Author: Stefan Negrea <snegrea@redhat.com>
Date: Fri May 31 17:30:46 2013 -0500
Update topology management to schema manager. No change to the external interface \
for the schema manager.
The topology update functionality includes updating the replication factor and gc \
grace period.
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
new file mode 100644
index 0000000..be69c08
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
@@ -0,0 +1,251 @@
+/*
+ *
+ * * RHQ Management Platform
+ * * Copyright (C) 2005-2012 Red Hat, Inc.
+ * * All rights reserved.
+ * *
+ * * This program is free software; you can redistribute it and/or modify
+ * * it under the terms of the GNU General Public License, version 2, as
+ * * published by the Free Software Foundation, and/or the GNU Lesser
+ * * General Public License, version 2.1, also as published by the Free
+ * * Software Foundation.
+ * *
+ * * This program is distributed in the hope that it will be useful,
+ * * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * * GNU General Public License and the GNU Lesser General Public License
+ * * for more details.
+ * *
+ * * You should have received a copy of the GNU General Public License
+ * * and the GNU Lesser General Public License along with this program;
+ * * if not, write to the Free Software Foundation, Inc.,
+ * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import org.rhq.cassandra.util.ClusterBuilder;
+import org.rhq.core.domain.cloud.StorageNode;
+import org.rhq.core.util.StringUtil;
+
+/**
+ * @author Stefan Negrea
+ */
+public class AbstractManager {
+
+ private static final String UPDATE_PLAN_ELEMENT = "updatePlan";
+ private static final String STEP_ELEMENT = "step";
+
+ private static final String SCHEMA_EXISTS_QUERY = "SELECT * FROM \
system.schema_keyspaces WHERE keyspace_name = 'rhq';"; + private static final \
String VERSION_COLUMNFAMILY_EXISTS_QUERY = "SELECT * from \
system.schema_columnfamilies WHERE keyspace_name='rhq' AND \
columnfamily_name='schema_version';"; + private static final String VERSION_QUERY \
= "SELECT version FROM rhq.schema_version"; +
+
+ private final Log log = LogFactory.getLog(AbstractManager.class);
+
+ protected Session session;
+ protected final String username;
+ protected final String password;
+ protected List<StorageNode> nodes = new ArrayList<StorageNode>();
+
+ public AbstractManager(String username, String password, List<StorageNode> \
nodes) { + try {
+ this.username = username;
+ this.password = password;
+ this.nodes = nodes;
+
+ this.initCluster();
+ this.shutdown();
+ } catch (NoHostAvailableException e) {
+ throw new RuntimeException("Unable create storage node session.", e);
+ }
+ }
+
+ protected boolean schemaExists() {
+ try {
+ ResultSet resultSet = session.execute(SCHEMA_EXISTS_QUERY);
+ if (!resultSet.all().isEmpty()) {
+ resultSet = session.execute(VERSION_COLUMNFAMILY_EXISTS_QUERY);
+ return !resultSet.all().isEmpty();
+ }
+ return false;
+ } catch (Exception e) {
+ log.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected int getSchemaVersion() {
+ int maxVersion = 0;
+ try {
+ ResultSet resultSet = session.execute(VERSION_QUERY);
+ for (Row row : resultSet.all()) {
+ if (maxVersion < row.getInt(0)) {
+ maxVersion = row.getInt(0);
+ }
+ }
+ } catch (Exception e) {
+ log.error(e);
+ throw new RuntimeException(e);
+ }
+
+ return maxVersion;
+ }
+
+ protected void removeAppliedUpdates(List<String> updateFiles, int \
currentSchemaVersion) { + while (!updateFiles.isEmpty()) {
+ int version = this.extractVersionFromUpdateFile(updateFiles.get(0));
+ if (version <= currentSchemaVersion) {
+ updateFiles.remove(0);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected int extractVersionFromUpdateFile(String file) {
+ file = file.substring(file.lastIndexOf('/') + 1);
+ file = file.substring(0, file.indexOf('.'));
+ return Integer.parseInt(file);
+ }
+
+ protected List<String> getSteps(String file) throws Exception {
+ List<String> steps = new ArrayList<String>();
+ InputStream stream = null;
+ try {
+ stream = SchemaManager.class.getClassLoader().getResourceAsStream(file);
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder = factory.newDocumentBuilder();
+ Document doc = builder.parse(stream);
+
+ Node rootDocument = \
doc.getElementsByTagName(UPDATE_PLAN_ELEMENT).item(0); + NodeList \
updateStepElements = rootDocument.getChildNodes(); +
+ for (int index = 0; index < updateStepElements.getLength(); index++) {
+ Node updateStepElement = updateStepElements.item(index);
+ if (STEP_ELEMENT.equals(updateStepElement.getNodeName()) && \
updateStepElement.getTextContent() != null) { + \
steps.add(updateStepElement.getTextContent()); + }
+ }
+ } catch (Exception e) {
+ log.error("Error reading the list of steps from " + file + " file.", e);
+ throw e;
+ } finally {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ log.error("Error closing the stream with the list of steps from \
" + file + " file.", e); + throw e;
+ }
+ }
+ }
+
+ return steps;
+ }
+
+ protected List<String> getUpdateFiles(String folder) throws Exception {
+ List<String> files = new ArrayList<String>();
+ InputStream stream = null;
+
+ try {
+ URL resourceFolderURL = \
this.getClass().getClassLoader().getResource(folder); +
+ if (resourceFolderURL.getProtocol().equals("file")) {
+ stream = this.getClass().getResourceAsStream(folder);
+ BufferedReader reader = new BufferedReader(new \
InputStreamReader(stream)); +
+ String updateFile;
+ while ((updateFile = reader.readLine()) != null) {
+ files.add(folder + updateFile);
+ }
+ } else if (resourceFolderURL.getProtocol().equals("jar")) {
+ URL jarURL = \
this.getClass().getClassLoader().getResources(folder).nextElement(); + \
JarURLConnection jarURLCon = (JarURLConnection) (jarURL.openConnection()); + \
JarFile jarFile = jarURLCon.getJarFile(); + Enumeration<JarEntry> \
entries = jarFile.entries(); + while (entries.hasMoreElements()) {
+ String entry = entries.nextElement().getName();
+ if (entry.startsWith(folder) && !entry.equals(folder)) {
+ files.add(entry);
+ }
+ }
+ }
+
+ Collections.sort(files, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Error reading the list of update files.", e);
+ throw e;
+ } finally {
+ if (stream != null) {
+ try{
+ stream.close();
+ } catch (Exception e) {
+ log.error("Error closing the stream with the list of update \
files.", e); + throw e;
+ }
+ }
+ }
+
+ return files;
+ }
+
+
+ protected void initCluster() throws NoHostAvailableException {
+ String[] hostNames = new String[nodes.size()];
+ for (int i = 0; i < hostNames.length; ++i) {
+ hostNames[i] = nodes.get(i).getAddress();
+ }
+
+ log.info("Initializing session to connect to " + \
StringUtil.arrayToString(hostNames)); +
+ Cluster cluster = new \
ClusterBuilder().addContactPoints(hostNames).withCredentials(username, password) + \
.withPort(nodes.get(0).getCqlPort()).withCompression(Compression.NONE).build(); +
+ log.info("Cluster connection configured.");
+
+ session = cluster.connect("system");
+ log.info("Cluster connected.");
+ }
+
+ protected void shutdown() {
+ log.info("Shutting down connections");
+ session.getCluster().shutdown();
+ }
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
index a43d06c..923065c 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
@@ -25,32 +25,9 @@
package org.rhq.cassandra.schema;
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.JarURLConnection;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Enumeration;
import java.util.List;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,284 +35,43 @@ import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import org.w3c.dom.Document;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.rhq.cassandra.util.ClusterBuilder;
import org.rhq.core.domain.cloud.StorageNode;
-import org.rhq.core.util.StringUtil;
/**
* @author John Sanda
*/
public class SchemaManager {
- private static final String RHQ_KEYSPACE = "rhq";
- private static final String SCHEMA_BASE_FOLDER = "schema";
- private static final String UPDATE_PLAN_ELEMENT = "updatePlan";
- private static final String STEP_ELEMENT = "step";
-
- private static final String SCHEMA_EXISTS_QUERY = "SELECT * FROM \
system.schema_keyspaces WHERE keyspace_name = 'rhq';";
- private static final String VERSION_COLUMNFAMILY_EXISTS_QUERY = "SELECT * from \
system.schema_columnfamilies WHERE keyspace_name='rhq' AND \
columnfamily_name='schema_version';";
- private static final String VERSION_QUERY = "SELECT version FROM \
rhq.schema_version";
- private static final String INSERT_VERSION_QUERY = "INSERT INTO \
rhq.schema_version (version, time ) VALUES ( ?, ?);";
-
-
private final Log log = LogFactory.getLog(SchemaManager.class);
- private enum Task {
- Drop("drop"),
- Create("create"),
- Update("update");
-
- private final String folder;
-
- private Task(String folder){
- this.folder = folder;
- }
-
- protected String getFolder() {
- return "" + SCHEMA_BASE_FOLDER + "/" + this.folder + "/";
- }
- }
-
- private Session session;
- private String username;
- private String password;
-
- private List<StorageNode> nodes = new ArrayList<StorageNode>();
+ private final String username;
+ private final String password;
+ private final List<StorageNode> nodes = new ArrayList<StorageNode>();
public SchemaManager(String username, String password, String... nodes) {
this(username, password, parseNodeInformation(nodes));
}
public SchemaManager(String username, String password, List<StorageNode> nodes) \
{
- try {
- this.username = username;
- this.password = password;
- this.nodes = nodes;
-
- this.initCluster();
- this.shutdown();
- } catch (NoHostAvailableException e) {
- throw new RuntimeException("Unable create storage node session.", e);
- }
+ this.username = username;
+ this.password = password;
+ this.nodes.addAll(nodes);
}
-
public void install() throws Exception {
- log.info("Preparing to install schema");
- try {
- initCluster();
-
- if (!schemaExists()) {
- this.executeTask(Task.Create);
- } else {
- log.info("RHQ schema already exists.");
- }
-
- this.executeTask(Task.Update);
- } catch (NoHostAvailableException e) {
- throw new RuntimeException(e);
- } finally {
- shutdown();
- }
+ VersionManager version = new VersionManager(username, password, nodes);
+ version.install();
}
public void drop() throws Exception {
- log.info("Preparing to drop RHQ schema");
- try {
- initCluster();
-
- if (schemaExists()) {
- this.executeTask(Task.Drop);
- } else {
- log.info("RHQ schema does not exist. Drop operation not required.");
- }
- } catch (NoHostAvailableException e) {
- throw new RuntimeException(e);
- } finally {
- shutdown();
- }
- }
-
- private boolean schemaExists() {
- try {
- ResultSet resultSet = session.execute(SCHEMA_EXISTS_QUERY);
- if (!resultSet.all().isEmpty()) {
- resultSet = session.execute(VERSION_COLUMNFAMILY_EXISTS_QUERY);
- return !resultSet.all().isEmpty();
- }
- return false;
- } catch (Exception e) {
- log.error(e);
- throw new RuntimeException(e);
- }
- }
-
- private int getSchemaVersion() {
- int maxVersion = 0;
- try {
- ResultSet resultSet = session.execute(VERSION_QUERY);
- for (Row row : resultSet.all()) {
- if (maxVersion < row.getInt(0)) {
- maxVersion = row.getInt(0);
- }
- }
- } catch (Exception e) {
- log.error(e);
- throw new RuntimeException(e);
- }
-
- return maxVersion;
- }
-
- private void executeTask(Task task) throws Exception {
- try {
- log.info("Starting to execute " + task + " task.");
-
- List<String> updateFiles = this.getUpdateFiles(task);
-
- if (Task.Update.equals(task)) {
- int currentSchemaVersion = this.getSchemaVersion();
- log.info("Current schema version is " + currentSchemaVersion);
- this.removeAppliedUpdates(updateFiles, currentSchemaVersion);
- }
-
- if (updateFiles.size() == 0 && Task.Update.equals(task)) {
- log.info("RHQ schema is current! No updates applied.");
- }
-
- for (String updateFile : updateFiles) {
- log.info("Applying file " + updateFile + " for " + task + " task.");
- for (String step : getSteps(updateFile)) {
- log.info("Statement: \n" + step);
- session.execute(step);
- }
-
- if (Task.Update.equals(task)) {
- this.updateSchemaVersion(updateFile);
- }
-
- log.info("File " + updateFile + " applied for " + task + " task.");
- }
- } catch (Exception e) {
- log.error(e);
- throw new RuntimeException(e);
- }
-
- log.info("Successfully executed " + task + " task.");
- }
-
- private void updateSchemaVersion(String test) {
- PreparedStatement preparedStatement = session.prepare(INSERT_VERSION_QUERY);
- BoundStatement boundStatement = \
preparedStatement.bind(this.extractVersionFromUpdateFile(test), new \
Date());
- session.execute(boundStatement);
- }
-
- private void removeAppliedUpdates(List<String> updateFiles, int \
currentSchemaVersion) {
- while (!updateFiles.isEmpty()) {
- int version = this.extractVersionFromUpdateFile(updateFiles.get(0));
- if (version <= currentSchemaVersion) {
- updateFiles.remove(0);
- } else {
- break;
- }
- }
- }
-
- private int extractVersionFromUpdateFile(String file) {
- file = file.substring(file.lastIndexOf('/') + 1);
- file = file.substring(0, file.indexOf('.'));
- return Integer.parseInt(file);
+ VersionManager version = new VersionManager(username, password, nodes);
+ version.drop();
}
- private List<String> getSteps(String file) throws Exception {
- List<String> steps = new ArrayList<String>();
- InputStream stream = null;
- try {
- stream = this.getClass().getClassLoader().getResourceAsStream(file);
-
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- DocumentBuilder builder = factory.newDocumentBuilder();
- Document doc = builder.parse(stream);
-
- Node rootDocument = \
doc.getElementsByTagName(UPDATE_PLAN_ELEMENT).item(0);
- NodeList updateStepElements = rootDocument.getChildNodes();
-
- for (int index = 0; index < updateStepElements.getLength(); index++) {
- Node updateStepElement = updateStepElements.item(index);
- if (STEP_ELEMENT.equals(updateStepElement.getNodeName()) && \
updateStepElement.getTextContent() != null) {
- steps.add(updateStepElement.getTextContent());
- }
- }
- } catch (Exception e) {
- log.error("Error reading the list of steps from " + file + " file.", e);
- throw e;
- } finally {
- if (stream != null) {
- try {
- stream.close();
- } catch (Exception e) {
- log.error("Error closing the stream with the list of steps from \
" + file + " file.", e);
- throw e;
- }
- }
- }
-
- return steps;
- }
-
- private List<String> getUpdateFiles(Task task) throws Exception {
- List<String> files = new ArrayList<String>();
- InputStream stream = null;
-
- try {
- URL resourceFolderURL = \
this.getClass().getClassLoader().getResource(task.getFolder());
-
- if (resourceFolderURL.getProtocol().equals("file")) {
- stream = this.getClass().getResourceAsStream(task.getFolder());
- BufferedReader reader = new BufferedReader(new \
InputStreamReader(stream));
-
- String updateFile;
- while ((updateFile = reader.readLine()) != null) {
- files.add(task.getFolder() + updateFile);
- }
- } else if (resourceFolderURL.getProtocol().equals("jar")) {
- URL jarURL = \
this.getClass().getClassLoader().getResources(task.getFolder()).nextElement();
- JarURLConnection jarURLCon = (JarURLConnection) \
(jarURL.openConnection());
- JarFile jarFile = jarURLCon.getJarFile();
- Enumeration<JarEntry> entries = jarFile.entries();
- while (entries.hasMoreElements()) {
- String entry = entries.nextElement().getName();
- if (entry.startsWith(task.getFolder()) && \
!entry.equals(task.getFolder())) {
- files.add(entry);
- }
- }
- }
-
- Collections.sort(files, new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- return o1.compareTo(o2);
- }
- });
- } catch (Exception e) {
- log.error("Error reading the list of update files.", e);
- throw e;
- } finally {
- if (stream != null) {
- try{
- stream.close();
- } catch (Exception e) {
- log.error("Error closing the stream with the list of update \
files.", e);
- throw e;
- }
- }
- }
-
- return files;
+ public void updateTopology() throws Exception {
+ TopologyManager topology = new TopologyManager(username, password, nodes);
+ topology.updateTopology();
}
private static List<StorageNode> parseNodeInformation(String... nodes) {
@@ -349,28 +85,6 @@ public class SchemaManager {
return parsedNodes;
}
- private void initCluster() throws NoHostAvailableException {
- String[] hostNames = new String[nodes.size()];
- for (int i = 0; i < hostNames.length; ++i) {
- hostNames[i] = nodes.get(i).getAddress();
- }
-
- log.info("Initializing session to connect to " + \
StringUtil.arrayToString(hostNames));
-
- Cluster cluster = new \
ClusterBuilder().addContactPoints(hostNames).withCredentials(username, \
password)
- .withPort(nodes.get(0).getCqlPort()).withCompression(Compression.NONE).build();
-
- log.info("Cluster connection configured.");
-
- session = cluster.connect("system");
- log.info("Cluster connected.");
- }
-
- private void shutdown() {
- log.info("Shutting down connections");
- session.getCluster().shutdown();
- }
-
public static void main(String[] args) throws Exception {
try {
Logger root = Logger.getRootLogger();
@@ -393,16 +107,15 @@ public class SchemaManager {
String username = args[1];
String password = args[2];
- System.out.println(args[3]);
-
SchemaManager schemaManager = new SchemaManager(username, password,
Arrays.copyOfRange(args, 3, args.length));
- System.out.println(command);
if ("install".equalsIgnoreCase(command)) {
schemaManager.install();
} else if ("drop".equalsIgnoreCase(command)) {
schemaManager.drop();
+ } else if ("topology".equalsIgnoreCase(command)) {
+ schemaManager.updateTopology();
} else {
throw new IllegalArgumentException(command + " not available.");
}
@@ -411,7 +124,5 @@ public class SchemaManager {
} finally {
System.exit(0);
}
-
}
-
}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
new file mode 100644
index 0000000..87e2a18
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * * RHQ Management Platform
+ * * Copyright (C) 2005-2012 Red Hat, Inc.
+ * * All rights reserved.
+ * *
+ * * This program is free software; you can redistribute it and/or modify
+ * * it under the terms of the GNU General Public License, version 2, as
+ * * published by the Free Software Foundation, and/or the GNU Lesser
+ * * General Public License, version 2.1, also as published by the Free
+ * * Software Foundation.
+ * *
+ * * This program is distributed in the hope that it will be useful,
+ * * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * * GNU General Public License and the GNU Lesser General Public License
+ * * for more details.
+ * *
+ * * You should have received a copy of the GNU General Public License
+ * * and the GNU Lesser General Public License along with this program;
+ * * if not, write to the Free Software Foundation, Inc.,
+ * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.util.List;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.domain.cloud.StorageNode;
+
+/**
+ * @author Stefan Negrea
+ */
+public class TopologyManager extends AbstractManager {
+
+ private final Log log = LogFactory.getLog(TopologyManager.class);
+
+ private static final String TOPOLOGY_BASE_FOLDER = "topology";
+
+
+ private enum Task {
+ UpdateReplicationFactor("0001.xml"),
+ UpdateGCGrace("0002.xml");
+
+ private final String file;
+
+ private Task(String file) {
+ this.file = file;
+ }
+
+ protected String getFile() {
+ return TOPOLOGY_BASE_FOLDER + "/" + this.file;
+ }
+ }
+
+ public TopologyManager(String username, String password, List<StorageNode> \
nodes) { + super(username, password, nodes);
+ }
+
+ public void updateTopology() throws Exception {
+ initCluster();
+ if (schemaExists()) {
+ log.info("Applying topology updates...");
+ this.updateReplicationFactor(nodes.size());
+ this.updateGCGrace(nodes.size());
+ } else {
+ log.info("Topology updates cannot be applied because the schema is not \
installed."); + }
+ shutdown();
+ }
+
+ private boolean updateReplicationFactor(int numberOfNodes) throws Exception {
+ log.info("Starting to execute " + Task.UpdateReplicationFactor + " task.");
+
+ int replicationFactor = 1;
+ if (numberOfNodes < 4) {
+ replicationFactor = numberOfNodes;
+ } else {
+ replicationFactor = 3;
+ }
+
+ log.info("Applying file " + Task.UpdateReplicationFactor.getFile() + " for " \
+ Task.UpdateReplicationFactor + + " task.");
+ for (String query : this.getSteps(Task.UpdateReplicationFactor.getFile())) {
+ executedPreparedStatement(query, replicationFactor);
+ }
+ log.info("File " + Task.UpdateReplicationFactor.getFile() + " applied for " \
+ Task.UpdateReplicationFactor + + " task.");
+
+ log.info("Successfully executed " + Task.UpdateReplicationFactor + " \
task."); + return true;
+ }
+
+ private boolean updateGCGrace(int numberOfNodes) throws Exception {
+ log.info("Starting to execute " + Task.UpdateGCGrace + " task.");
+
+ int gcGraceSeconds = 864000;
+ if (numberOfNodes == 1) {
+ gcGraceSeconds = 0;
+ } else {
+ gcGraceSeconds = 691200; // 8 days
+ }
+
+
+ log.info("Applying file " + Task.UpdateGCGrace.getFile() + " for " + \
Task.UpdateGCGrace + " task."); + for (String query : \
this.getSteps(Task.UpdateGCGrace.getFile())) { + \
executedPreparedStatement(query, gcGraceSeconds); + }
+ log.info("File " + Task.UpdateGCGrace.getFile() + " applied for " + \
Task.UpdateGCGrace + " task."); +
+ log.info("Successfully executed " + Task.UpdateGCGrace + " task.");
+ return true;
+ }
+
+ private void executedPreparedStatement(String query, Object... values) {
+ String formattedQuery = String.format(query, values);
+ log.info("Statement: \n" + formattedQuery);
+ PreparedStatement preparedStatement = session.prepare(formattedQuery);
+ BoundStatement boundStatement = preparedStatement.bind();
+ session.execute(boundStatement);
+ }
+
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java \
b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
new file mode 100644
index 0000000..437e097
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * * RHQ Management Platform
+ * * Copyright (C) 2005-2012 Red Hat, Inc.
+ * * All rights reserved.
+ * *
+ * * This program is free software; you can redistribute it and/or modify
+ * * it under the terms of the GNU General Public License, version 2, as
+ * * published by the Free Software Foundation, and/or the GNU Lesser
+ * * General Public License, version 2.1, also as published by the Free
+ * * Software Foundation.
+ * *
+ * * This program is distributed in the hope that it will be useful,
+ * * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * * GNU General Public License and the GNU Lesser General Public License
+ * * for more details.
+ * *
+ * * You should have received a copy of the GNU General Public License
+ * * and the GNU Lesser General Public License along with this program;
+ * * if not, write to the Free Software Foundation, Inc.,
+ * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ */
+
+package org.rhq.cassandra.schema;
+
+import java.util.Date;
+import java.util.List;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.domain.cloud.StorageNode;
+
+/**
+ * @author Stefan Negrea
+ */
+public class VersionManager extends AbstractManager {
+
+ private static final String SCHEMA_BASE_FOLDER = "schema";
+ private static final String INSERT_VERSION_QUERY = "INSERT INTO \
rhq.schema_version (version, time ) VALUES ( ?, ?);"; +
+ private final Log log = LogFactory.getLog(VersionManager.class);
+
+ private enum Task {
+ Drop("drop"),
+ Create("create"),
+ Update("update");
+
+ private final String folder;
+
+ private Task(String folder){
+ this.folder = folder;
+ }
+
+ protected String getFolder() {
+ return SCHEMA_BASE_FOLDER + "/" + this.folder + "/";
+ }
+ }
+
+ public VersionManager(String username, String password, List<StorageNode> nodes) \
{ + super(username, password, nodes);
+ }
+
+ public void install() throws Exception {
+ log.info("Preparing to install schema");
+ try {
+ initCluster();
+
+ if (!schemaExists()) {
+ this.executeTask(Task.Create);
+ } else {
+ log.info("RHQ schema already exists.");
+ }
+
+ this.executeTask(Task.Update);
+ } catch (NoHostAvailableException e) {
+ throw new RuntimeException(e);
+ } finally {
+ shutdown();
+ }
+ }
+
+ public void drop() throws Exception {
+ log.info("Preparing to drop RHQ schema");
+ try {
+ initCluster();
+
+ if (schemaExists()) {
+ this.executeTask(Task.Drop);
+ } else {
+ log.info("RHQ schema does not exist. Drop operation not required.");
+ }
+ } catch (NoHostAvailableException e) {
+ throw new RuntimeException(e);
+ } finally {
+ shutdown();
+ }
+ }
+
+ private void executeTask(Task task) throws Exception {
+ try {
+ log.info("Starting to execute " + task + " task.");
+
+ List<String> updateFiles = this.getUpdateFiles(task.getFolder());
+
+ if (Task.Update.equals(task)) {
+ int currentSchemaVersion = this.getSchemaVersion();
+ log.info("Current schema version is " + currentSchemaVersion);
+ this.removeAppliedUpdates(updateFiles, currentSchemaVersion);
+ }
+
+ if (updateFiles.size() == 0 && Task.Update.equals(task)) {
+ log.info("RHQ schema is current! No updates applied.");
+ }
+
+ for (String updateFile : updateFiles) {
+ log.info("Applying file " + updateFile + " for " + task + " task.");
+ for (String step : getSteps(updateFile)) {
+ log.info("Statement: \n" + step);
+ session.execute(step);
+ }
+
+ if (Task.Update.equals(task)) {
+ this.updateSchemaVersion(updateFile);
+ }
+
+ log.info("File " + updateFile + " applied for " + task + " task.");
+ }
+ } catch (Exception e) {
+ log.error(e);
+ throw new RuntimeException(e);
+ }
+
+ log.info("Successfully executed " + task + " task.");
+ }
+
+ private void updateSchemaVersion(String updateFileName) {
+ PreparedStatement preparedStatement = session.prepare(INSERT_VERSION_QUERY);
+ BoundStatement boundStatement = \
preparedStatement.bind(this.extractVersionFromUpdateFile(updateFileName), + \
new Date()); + session.execute(boundStatement);
+ }
+}
diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0001.xml \
b/modules/common/cassandra-schema/src/main/resources/topology/0001.xml new file mode \
100644 index 0000000..f4382dc
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/resources/topology/0001.xml
@@ -0,0 +1,13 @@
+<updatePlan>
+ <step>
+ UPDATE system.schema_keyspaces SET
+ strategy_options='{"replication_factor":"%s"}'
+ WHERE keyspace_name='rhq'
+ </step>
+
+ <step>
+ UPDATE system.schema_keyspaces SET
+ strategy_options='{"replication_factor":"%s"}'
+ WHERE keyspace_name='system_auth'
+ </step>
+</updatePlan>
\ No newline at end of file
diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0002.xml \
b/modules/common/cassandra-schema/src/main/resources/topology/0002.xml new file mode \
100644 index 0000000..250aec0
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/resources/topology/0002.xml
@@ -0,0 +1,38 @@
+<updatePlan>
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='metrics_index';
+ </step>
+
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='raw_metrics';
+ </step>
+
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='one_hour_metrics';
+ </step>
+
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='six_hour_metrics';
+ </step>
+
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='twenty_four_hour_metrics';
+ </step>
+
+ <step>
+ UPDATE system.schema_columnfamilies
+ SET gc_grace_seconds = %s
+ WHERE keyspace_name='rhq' AND columnfamily_name='schema_version';
+ </step>
+
+</updatePlan>
\ No newline at end of file
_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic