[prev in list] [next in list] [prev in thread] [next in thread] 

List:       collectd
Subject:    [collectd]  New CSV like plugin and help with patches
From:       Daniel Hilst <danielhilst () gmail ! com>
Date:       2012-07-26 0:10:23
Message-ID: 50108AEF.2060801 () gmail ! com
[Download RAW message or body]

Hi,

I write a new CSV plugin based on csv plugin source. This
plugin creates only one CSV file and write only the values that you
have requested. The "original" CSV file will write every value
collected, if you have a machine receiving data from other
collectd's then you have a huge amount of data being written.

This plugin will help you to have only what you want on CSV
file. Also the file is rotated daily, as csv plugin would do too. Is 
useful for data that
would suffer further processing. We use this to get this values
written on a database for example.

Here is an example of its configuration block

<Plugin "pax_write">
        DataDir "/opt/collectd/var/lib/collectd/csv/"
        FilePrefix "data-file"
        StoreRates false

<Host "localhost">
              Plugin "memory/memory-free" # If you dot pass a datasource
<Plugin "load/load">        # then all are written to file
                      DS "shortterm" "midterm"
</Plugin>
</Host>
</Plugin>

Also I'm here asking for help. Since I've forked[1] octo's git
repo.. And have write my changes to that fork.. How can I
generate a patch for the new plugin.. I can't do it for tar
source because I need to apply changes on configure.in and
src/Makefile.am and run build.sh script that is only available on
git version.. What to do??

I'm attaching the plugin source here. Here is how make it work

- Clone from collectd git
- Download the source of plugin attached to this message and put it on 
src directory
- Put this line on configure.in
AC_PLUGIN([pax_write], [yes], [Pax write  output plugin])

- Put this lines on src/Makefile.am
if BUILD_PLUGIN_PAX_WRITE
pkglib_LTLIBRARIES += pax_write.la
pax_write_la_SOURCES = pax_write.c
pax_write_la_LDFLAGS = -module -avoid-version
collectd_LDADD += "-dlopen" pax_write.la
collectd_DEPENDENCIES += pax_write.la
endif

- Run build.sh
- ./configure && make install all

Thanks in advance!
Cheers!

Hilst

[1] https://github.com/gkos/collectd


["pax_write.c" (text/x-c)]

/**
 * collectd - src/pax_write.c
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; only version 2 of the License is applicable.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 *
 * Authors of csv plugin:
 *   Florian octo Forster <octo at verplant.org>
 *   Doug MacEachern <dougm@hyperic.com>
 *
 * Author of this plugin:
 *   Daniel Hilst <danielhilst@gmail.com>
 *   
 **/

/* Description:
 *  This plugin will create an csv file and put only
 *  the values that you request. Is based on csv plugin.
 *
 *  Here is an example of config block
 *
 *  <Plugin "pax_write">
 * 	DataDir "/opt/collectd/var/lib/collectd/csv/"
 *         FilePrefix "data-file"
 * 	   StoreRates false
 *         
 *          <Host "localhost">
 *               Plugin "memory/memory-free" # If you dot pass a datasource
 *               <Plugin "load/load">        # then all are writed to file
 *                       DS "shortterm" "midterm"
 *               </Plugin>
 *         </Host>
 * </Plugin>
 *
 * 
 */

#define C_AVL_CREATE_STRCMP() (c_avl_create((int (*)(const void *, const void \
*))strcmp))

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include "collectd.h"
#include "plugin.h"
#include "common.h"
#include "utils_cache.h"
#include "utils_parse_option.h"
#include "utils_avltree.h"

/*
 * Private variables
 */


static char *empty_str = "";
static char *datadir   = NULL;
static int store_rates = 0;
static c_avl_tree_t *hosts_tree = NULL;
/* static char **include_plugins_keys = NULL; */
/* static char **include_plugins_values = NULL; */
static char *fileprefix = NULL;


/*
 * Returns:
 * 0 on sucess
 * 1 if this entry should not be added on csv, this is not an error
 * -1 if error
 */
static int value_list_to_string (char *buffer, int buffer_len,
                                 const data_set_t *ds, const value_list_t *vl)
{
	int offset = 0;
	int status;
	int i;
	gauge_t *rates = NULL;

	assert (0 == strcmp (ds->type, vl->type));
        
        /* Succesive searchs
         * hosts_tree->plugin_tree->pinstance_tree->type_tree->tinstance_tree->datasources
                
         */
        void *tree_ptr;
        c_avl_tree_t *plugin_tree = NULL;
        if (c_avl_get(hosts_tree, vl->host, &tree_ptr) != 0)
                return 1;
        plugin_tree = tree_ptr;
        DEBUG("pax write plugin: value_list_to_string: Found host '%s'", vl->host);

        c_avl_tree_t *pinstance_tree = NULL;
        if (c_avl_get(plugin_tree, vl->plugin, &tree_ptr) != 0)
                return 1;
        pinstance_tree = tree_ptr;
        DEBUG("pax write plugin: value_list_to_string: Found plugin '%s'", \
vl->plugin);


        c_avl_tree_t *type_tree = NULL;
        if (c_avl_get(pinstance_tree, vl->plugin_instance, &tree_ptr) != 0)
            return 1;
        type_tree = tree_ptr;
        DEBUG("pax write plugin: value_list_to_string: Found plugin instance '%s'", \
vl->plugin_instance);

        c_avl_tree_t *tinstance_tree = NULL;
        if (c_avl_get(type_tree, vl->type, &tree_ptr) != 0)
                return 1;
        tinstance_tree = tree_ptr;
        DEBUG("pax write plugin: value_list_to_string: Found type '%s'", vl->type);

        c_avl_tree_t *datasource_tree = NULL;
        if (c_avl_get(tinstance_tree, vl->type_instance, &tree_ptr) != 0)
                return 1;
        datasource_tree = tree_ptr;
        DEBUG("pax write plugin: value_list_to_string: Found type instance '%s'", \
vl->type_instance);  

        /*
         * The buffer will receive the line to be printed
         */
	memset (buffer, '\0', buffer_len);
	for (i = 0; i < ds->ds_num; i++)
	{
                if (c_avl_get(datasource_tree, ds->ds[i].name, NULL) != 0 &&
                    c_avl_get(datasource_tree, "ALL", NULL) != 0)
                        continue;

                DEBUG("pax write plugin: value_list_to_string: Found datasource \
'%s'", ds->ds[i].name);

                /*
                 * A new line if we are not on
                 * first iteration
                 */
                if (i > 0) 
                {
                        status = ssnprintf(buffer + offset, buffer_len - offset, \
                "\n");
                        if ((status < 1) || (status >= buffer_len - offset))
                            return (-1);
                            
                        offset += status;
                }


                /*
                 * Host
                 */
                status = ssnprintf (buffer + offset, buffer_len - offset,
                                    "%s", vl->host);
                if ((status < 1) || (status >= buffer_len - offset))
                        return (-1);
                offset += status;

                /*
                 * Plugin Instance
                 */
                if ((strlen (vl->plugin_instance) == 0))
                {
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                            ",%s", vl->plugin);
                }
                else
                {
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                            ",%s-%s", vl->plugin, \
vl->plugin_instance);  }

                if ((status < 1) || (status >= buffer_len - offset))
                        return (-1);
                offset += status;
        


                /*
                 * Type instance
                 */
                if ((strlen (vl->type_instance) == 0))
                {
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                            ",%s", vl->type);
                }
                else
                {
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                            ",%s-%s", vl->type, vl->type_instance);
                }

                if ((status < 1) || (status >= buffer_len - offset))
                        return (-1);
                offset += status;


                /*
                 *  Data source name
                 */
                status = ssnprintf (buffer + offset, buffer_len - offset,
                                    ",%s", ds->ds[i].name);
                if ((status < 1) || (status >= buffer_len - offset))
                        return (-1);
                offset += status;
                
                /*
                 *  Data source
                 */
		if ((ds->ds[i].type != DS_TYPE_COUNTER)
				&& (ds->ds[i].type != DS_TYPE_GAUGE)
				&& (ds->ds[i].type != DS_TYPE_DERIVE)
				&& (ds->ds[i].type != DS_TYPE_ABSOLUTE))
			return (-1);

		if (ds->ds[i].type == DS_TYPE_GAUGE) 
		{
			status = ssnprintf (buffer + offset, buffer_len - offset,
					",%lf", vl->values[i].gauge);
		} 
		else if (store_rates != 0)
		{
			if (rates == NULL)
				rates = uc_get_rate (ds, vl);
			if (rates == NULL)
			{
				WARNING ("pax plugin: "
						"uc_get_rate failed.");
				return (-1);
			}
			status = ssnprintf (buffer + offset,
					buffer_len - offset,
					",%lf", rates[i]);
		}
		else if (ds->ds[i].type == DS_TYPE_COUNTER)
		{
			status = ssnprintf (buffer + offset,
					buffer_len - offset,
					",%llu",
					vl->values[i].counter);
		}
		else if (ds->ds[i].type == DS_TYPE_DERIVE)
		{
			status = ssnprintf (buffer + offset,
					buffer_len - offset,
					",%"PRIi64,
					vl->values[i].derive);
		}
		else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
		{
			status = ssnprintf (buffer + offset,
					buffer_len - offset,
					",%"PRIu64,
					vl->values[i].absolute);
		}

		if ((status < 1) || (status >= (buffer_len - offset)))
		{
			sfree (rates);
			return (-1);
		}

		offset += status;

                /*
                 * Epoch/Time
                 */
                status = ssnprintf (buffer + offset, buffer_len - offset, ",%.3f",
                                    CDTIME_T_TO_DOUBLE (vl->time));
                if ((status < 1) || (status >= buffer_len - offset))
                        return (-1);
                offset += status;
	} /* for ds->ds_num */

	sfree (rates);
	return (0);
} /* int value_list_to_string */

static int value_list_to_filename (char *buffer, int buffer_len, const char \
*fileprefix) {
	int status;
        int offset = 0;

	if (datadir != NULL)
	{
		status = ssnprintf (buffer, buffer_len,
				"%s", datadir);
		if ((status < 1) || (status >= buffer_len))
			return (-1);
		offset += status;
	} else {
                ERROR ("pax write plugin: value_list_to_filename: DataDir is NULL at \
%d", __LINE__);  return (-1);
        }

        assert(fileprefix);
        status = ssnprintf (buffer + offset, buffer_len,
                            "%s", fileprefix);
        if ((status < 1) || (status >= buffer_len))
                return (-1);
        offset += status;

        time_t now;
        struct tm stm;

        /* TODO: Find a way to minimize the calls to `localtime_r',
         * since they are pretty expensive.. */
        now = time (NULL);
        if (localtime_r (&now, &stm) == NULL)
        {
                ERROR ("pax plugin: localtime_r failed");
                return (1);
        }

        strftime (buffer + offset, buffer_len - offset,
                  "-%Y-%m-%d", &stm);
                

	return (0);
} /* int value_list_to_filename */


static int pax_create_file (const char *filename, const data_set_t *ds)
{
        int fd;

        if (check_create_dir (datadir))
                return (-1);

        fd = creat (filename, S_IRWXU | S_IWUSR | S_IRGRP | S_IROTH);
        if (fd == -1)
        {
                char errbuf[1024];
                ERROR ("pax plugin: creat (%s) failed: %s",
                                filename,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }

        close (fd);

        return 0;
} /* int pax_create_file */


static int pax_config_add_plugin(char *hostname, oconfig_item_t *pluginopt, \
c_avl_tree_t *hosts_tree) {
        int j;
        char *pname = NULL;
        char *ptr = NULL;
        char *plugin = NULL;
        char *pinstance = NULL;
        char *type = NULL;
        char *tinstance = NULL;

        /* Initialize plugin, pinstance, type and tinstance.
         */
        pname = strdup(pluginopt->values[0].value.string);
        if (!pname)
        {
                char errbuf[1024];
                ERROR("pax write plugin: pax_config_add_plugin: Can't allocate memory \
for plugin name: "  "%s", sstrerror(errno, errbuf, sizeof(errbuf)));
                return (-1);
        }
        
        ptr = strchr(pname, '/');
        if (ptr)
                *ptr++ = '\0';
        else
        {
                ERROR("pax write plugin: pax_config_add_plugin: Plugin option value \
should be in format "  "PLUGIN[-PLUGIN_INSTANCE]/TYPE[-TYPE_INSTANCE]");
                return -1;
        }

        plugin = pname;
        type = ptr;

        ptr = strchr(plugin, '-');
        if (ptr)
        {
                *ptr++ = '\0';
                pinstance = ptr;
        }
        else
                pinstance = empty_str;

        ptr = strchr(type, '-');
        if (ptr)
        {
                *ptr++ = '\0';
                tinstance = ptr;
        }
        else
                tinstance = empty_str;
                      
        assert(plugin && type && tinstance && pinstance);


        void *tree_ptr;

        /* Here I will create the tree structure. It is composed
         * of trees containg trees until datasource is found. The plugin
         * instance and type instance may be a empty string "" and I
         * can still use they as tree keys.
         */
        assert(hosts_tree);
        c_avl_tree_t *plugin_tree = NULL;
        if (c_avl_get(hosts_tree, hostname, &tree_ptr) != 0)
        {
                plugin_tree = C_AVL_CREATE_STRCMP();
                assert(plugin_tree);
                c_avl_insert(hosts_tree, hostname, plugin_tree);
        }
        else 
                plugin_tree = tree_ptr;

        c_avl_tree_t *pinstance_tree = NULL;
        if (c_avl_get(plugin_tree, plugin, &tree_ptr) != 0)
        {
                pinstance_tree = C_AVL_CREATE_STRCMP();
                assert(pinstance_tree);
                c_avl_insert(plugin_tree, plugin, pinstance_tree);
        }
        else
                pinstance_tree = tree_ptr;

        c_avl_tree_t *type_tree = NULL;
        if (c_avl_get(pinstance_tree, pinstance, &tree_ptr) != 0)
        {
                type_tree = C_AVL_CREATE_STRCMP();
                assert(type_tree);
                c_avl_insert(pinstance_tree, pinstance, type_tree);
        }
        else
                type_tree = tree_ptr;

        c_avl_tree_t *tinstance_tree = NULL;
        if (c_avl_get(type_tree, type, &tree_ptr) != 0)
        {
                tinstance_tree = C_AVL_CREATE_STRCMP();
                assert(tinstance_tree);
                c_avl_insert(type_tree, type, tinstance_tree);
        }
        else
                tinstance_tree = tree_ptr;


        c_avl_tree_t *datasource_tree = NULL;
        if (c_avl_get(tinstance_tree, tinstance, &tree_ptr) != 0)
        {
                datasource_tree = C_AVL_CREATE_STRCMP();
                assert(datasource_tree);
                c_avl_insert(tinstance_tree, tinstance, datasource_tree);
        }
        else
                datasource_tree = tree_ptr;

        /* No datasources means: collect all datasources */
        if (pluginopt->children_num == 1)
        {
                for (j = 0; j < pluginopt->children[0].values_num; j++)
                {
                        char *ds = \
strdup(pluginopt->children[0].values[j].value.string);  if (!ds) 
                        {
                                char errbuf[1024];
                                ERROR("pax write plugin: pax_config_add_plugin: Can't \
                allocate memory strdup() %s", 
                                      sstrerror(errno, errbuf, sizeof(errbuf)));
                                return (-1);
                        }
                        DEBUG("pax write plugin: pax_config_add_plugin: Found \
datasource %s",   ds);
                        c_avl_insert(datasource_tree, ds, NULL);
                }
        }
        else if (pluginopt->children_num == 0)
        {
                /* If plugin has no child then assume that all datasouces should be \
                writen
                 */
                c_avl_insert(datasource_tree, "ALL", NULL);
                DEBUG("pax write plugin: pax_config_add_plugin: No datasource, \
assuming all");  }
        else
        {
                ERROR("pax write plugin: pax_config_add_plugin: Plugin block expect 0 \
or 1 options");  return (-1);
        }


        return (0);
}

static int pax_config_add_host (oconfig_item_t *hostopt)
{
        int i;
        char *hostname = strdup(hostopt->values[0].value.string);

        if (!hostname)
        {
                char errbuf[1024];
                ERROR("pax write plugin: pax_config_add_host: Can't allocate memory: \
%s\n",  sstrerror (errno, errbuf,sizeof (errbuf)));
                return (-1);
        }
        
        if (!hosts_tree)
        {
                hosts_tree = C_AVL_CREATE_STRCMP();
                if (!hosts_tree) 
                {
                        ERROR("pax write plugin: pax_config_add_host: Can't allocate \
memory for AVL Trees at %d", __LINE__);  return (-1);
                }
        }

        for (i = 0; i < hostopt->children_num; i++)
        {
                oconfig_item_t *pluginopt = hostopt->children + i;

                if (strcasecmp("Plugin", pluginopt->key) == 0)
                {
                        int error;

                        if (pluginopt->values_num != 1) 
                        {
                                ERROR("pax write plugin: pax_config_add_host: Plugin \
                block expect exactly "
                                      "one argument! %d given", \
pluginopt->values_num);  return (-1);
                        }

                        DEBUG("pax write plugin: pax_config_add_host: Found plugin \
%s",   pluginopt->values[0].value.string);

                        error = pax_config_add_plugin(hostname, pluginopt, \
hosts_tree);  if (error)
                                return (-1);
                }
        }

        return 0;
}

static int pax_config_complex (oconfig_item_t *ci)
{
        int i;

        for (i = 0; i < ci->children_num; i++)
        {
                oconfig_item_t *option = ci->children + i;
                if (strcasecmp ("DataDir", option->key) == 0) 
                {
                        if (datadir != NULL)
                                free (datadir);

                        datadir = strdup (option->values[0].value.string);
                        if (datadir != NULL && datadir[strlen(datadir) - 1] != '/')
                        {
                                ERROR("pax write plugin: pax_config: Directory path \
should end with a slash");  free (datadir);
                                datadir = NULL;
                                return (-1);
                        }
                        DEBUG("pax write plugin: pax_config: DataDir = '%s'", \
datadir);  }
                else if (strcasecmp ("StoreRates", option->key) == 0)
                {
                        if (option->values[0].value.boolean)
                                store_rates = 1;
                        else
                                store_rates = 0;
                        DEBUG("pax write plugin: pax_config: StoreRates = '%s'", \
(store_rates ? "true" : "false"));  }
                else if (strcasecmp ("FilePrefix", option->key) == 0)
                {
                        if (fileprefix != NULL)
                                free(fileprefix);
                
                        fileprefix = strdup(option->values[0].value.string);
                        DEBUG("pax write plugin: pax_config: FilePrefix = '%s'", \
fileprefix);  }
                else if (strcasecmp ("Host", option->key) == 0)
                {
                        int error;

                        if (option->values_num != 1)
                        {
                                ERROR("pax write plugin: pax_config: Host block \
                expect exactly "
                                      "one argument! %d given", option->values_num);
                                return (-1);
                        }

                        DEBUG("pax write plugin: pax_config: Found host %s", \
option->values[0].value.string);  error  = pax_config_add_host(option);
                        if (error) 
                                return (-1);
                }
                else
                        return (-1);

        }
        return (0);
} /* int pax_config_complex */                

static int pax_write (const data_set_t *ds, const value_list_t *vl,
                      user_data_t __attribute__((unused)) *user_data)
{
        struct stat  statbuf;
        char         filename[512];
        char         values[4096];
        FILE        *pax_fp;
        int          pax_fd;
        struct flock fl;
        int          status;
        int rval;

        if (0 != strcmp (ds->type, vl->type)) {
                ERROR ("pax plugin: DS type does not match value list type");
                return -1;
        }

        if (value_list_to_filename (filename, sizeof (filename), fileprefix) != 0)
                return (-1);

        DEBUG ("pax write plugin: pax_write: filename = %s;", filename);

        rval = value_list_to_string (values, sizeof (values), ds, vl);
        if (rval < 0)
                return (-1);
        else if (rval > 0)
                return (0);
                     
                

        if (stat (filename, &statbuf) == -1)
        {
                if (errno == ENOENT)
                {
                        if (pax_create_file (filename, ds))
                                return (-1);
                }
                else
                {
                        char errbuf[1024];
                        ERROR ("stat(%s) failed: %s", filename,
                                        sstrerror (errno, errbuf,
                                                sizeof (errbuf)));
                        return (-1);
                }
        }
        else if (!S_ISREG (statbuf.st_mode))
        {
                ERROR ("stat(%s): Not a regular file!",
                                filename);
                return (-1);
        }

        pax_fp = fopen (filename, "a");
        if (pax_fp == NULL)
        {
                char errbuf[1024];
                ERROR ("pax write plugin: fopen (%s) failed: %s", filename,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
        pax_fd = fileno (pax_fp);

        memset (&fl, '\0', sizeof (fl));
        fl.l_start  = 0;
        fl.l_len    = 0; /* till end of file */
        fl.l_pid    = getpid ();
        fl.l_type   = F_WRLCK;
        fl.l_whence = SEEK_SET;

        status = fcntl (pax_fd, F_SETLK, &fl);
        if (status != 0)
        {
                char errbuf[1024];
                ERROR ("pax plugin: flock (%s) failed: %s", filename,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                fclose (pax_fp);
                return (-1);
        }

        fprintf (pax_fp, "%s\n", values);

        /* The lock is implicitely released. I we don't release it explicitely
         * because the `FILE *' may need to flush a cache first */
        fclose (pax_fp);

        return (0);
} /* int pax_write */


void module_register (void)
{
        plugin_register_complex_config ("pax_write", pax_config_complex);
        plugin_register_write ("pax_write", pax_write, /* user_data = */ NULL);
} /* void module_register */



_______________________________________________
collectd mailing list
collectd@verplant.org
http://mailman.verplant.org/listinfo/collectd


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic