--- a/Makefile.am +++ b/Makefile.am @@ -1733,6 +1733,13 @@ pinba_la_LIBADD = $(BUILD_WITH_LIBPROTOBUF_C_LIBS) endif +if BUILD_PLUGIN_PORTCHECK +pkglib_LTLIBRARIES += portcheck.la +portcheck_la_SOURCES = src/portcheck.c +portcheck_la_LDFLAGS = $(PLUGIN_LDFLAGS) +portcheck_la_LIBADD = -lm +endif + if BUILD_PLUGIN_PING pkglib_LTLIBRARIES += ping.la ping_la_SOURCES = src/ping.c --- a/configure.ac +++ b/configure.ac @@ -7140,6 +7140,7 @@ # FIXME: Check for libevent, too. AC_PLUGIN([pinba], [$plugin_pinba], [Pinba statistics]) AC_PLUGIN([ping], [$with_liboping], [Network latency statistics]) +AC_PLUGIN([portcheck], [yes], [Network port monitor by CallTek]) AC_PLUGIN([postgresql], [$with_libpq], [PostgreSQL database statistics]) AC_PLUGIN([powerdns], [yes], [PowerDNS statistics]) AC_PLUGIN([processes], [$plugin_processes], [Process statistics]) --- /dev/null +++ b/src/portcheck.c @@ -0,0 +1,806 @@ +/** + * collectd - src/portcheck.c + * Copyright (C) 2005-2012 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * András Otártics + **/ + +#include "collectd.h" + +#include "plugin.h" +#include "utils/common/common.h" +#include "utils_complain.h" + +#include +#if HAVE_NETDB_H +#include /* NI_MAXHOST */ +#endif + +#ifdef HAVE_SYS_CAPABILITY_H +#include +#endif + +#include +#include +#include + +#ifndef NI_MAXHOST +#define NI_MAXHOST 1025 +#endif + + +/* + * Private data types + */ +struct hostlist_s { + char *host; + char *ident; + + + char *ip; + uint32_t port; + bool tcp; // True: tcp; false: udp + // for now we wont support this, will need to rewrite config loading so + // it can be specified for each host + int af; // address family + double latency_total; + + int error; // when some system error happened, dont submit value + + struct hostlist_s *next; +}; +typedef struct hostlist_s hostlist_t; + +/* + * Private variables + */ +static hostlist_t *hostlist_head; + +static double portcheck_interval = 300.0; +static double portcheck_timeout = 5.0; + +static pthread_mutex_t portcheck_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t portcheck_cond = PTHREAD_COND_INITIALIZER; +static int portcheck_thread_loop; +static int portcheck_thread_error; +static pthread_t portcheck_thread_id; + +static const char *config_keys[] = {"Host", "SourceAddress", "AddressFamily", + "Size", "TTL", "Interval", + "Timeout", "MaxMissed"}; +static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); + +/* + * Private functions + */ +/* Assure that `ts->tv_nsec' is in the range 0 .. 999999999 */ +static void time_normalize(struct timespec *ts) /* {{{ */ +{ + while (ts->tv_nsec < 0) { + if (ts->tv_sec == 0) { + ts->tv_nsec = 0; + return; + } + + ts->tv_sec -= 1; + ts->tv_nsec += 1000000000; + } + + while (ts->tv_nsec >= 1000000000) { + ts->tv_sec += 1; + ts->tv_nsec -= 1000000000; + } +} /* }}} void time_normalize */ + +/* Add `ts_int' to `tv_begin' and store the result in `ts_dest'. If the result + * is larger than `tv_end', copy `tv_end' to `ts_dest' instead. */ +static void time_calc(struct timespec *ts_dest, /* {{{ */ + const struct timespec *ts_int, + const struct timeval *tv_begin, + const struct timeval *tv_end) { + ts_dest->tv_sec = tv_begin->tv_sec + ts_int->tv_sec; + ts_dest->tv_nsec = (tv_begin->tv_usec * 1000) + ts_int->tv_nsec; + time_normalize(ts_dest); + + /* Assure that `(begin + interval) > end'. + * This may seem overly complicated, but `tv_sec' is of type `time_t' + * which may be `unsigned. *sigh* */ + if ((tv_end->tv_sec > ts_dest->tv_sec) || + ((tv_end->tv_sec == ts_dest->tv_sec) && + ((tv_end->tv_usec * 1000) > ts_dest->tv_nsec))) { + ts_dest->tv_sec = tv_end->tv_sec; + ts_dest->tv_nsec = 1000 * tv_end->tv_usec; + } + + time_normalize(ts_dest); +} /* }}} void time_calc */ + + +int connect_wait (int sockno, struct sockaddr * addr, size_t addrlen, struct timeval * timeout) +{ + int res, opt; + + // get socket flags + if ((opt = fcntl (sockno, F_GETFL, NULL)) < 0) { + ERROR("portcheck plugin: getting socket flags failed."); + return -1; + } + + // set socket non-blocking + if (fcntl (sockno, F_SETFL, opt | O_NONBLOCK) < 0) { + ERROR("portcheck plugin: setting socket flags failed."); + return -1; + } + + // try to connect + if ((res = connect (sockno, addr, addrlen)) < 0) { + if (errno == EINPROGRESS) { + fd_set wait_set; + + // make file descriptor set with socket + FD_ZERO (&wait_set); + FD_SET (sockno, &wait_set); + + // wait for socket to be writable; return after given timeout + res = select (sockno + 1, NULL, &wait_set, NULL, timeout); + } + } + // connection was successful immediately + else { + res = 1; + } + + // reset socket flags + if (fcntl (sockno, F_SETFL, opt) < 0) { + ERROR("portcheck plugin: resetting socket flags failed."); + return -1; + } + + // an error occured in connect or select + if (res < 0) { + ERROR("portcheck plugin: an error occured in connect or select."); + return -1; + } + // select timed out + else if (res == 0) { + errno = ETIMEDOUT; + return 1; + } + // almost finished... + else { + socklen_t len = sizeof (opt); + + // check for errors in socket layer + if (getsockopt (sockno, SOL_SOCKET, SO_ERROR, &opt, &len) < 0) { + ERROR("portcheck plugin: errors in the socket layer."); + return -1; + } + + // there was an error + if (opt==111 || opt==110) // 111: connection refused, which is OK with us; 110: timedout + return 1; + if (opt && opt!=111) { + errno = opt; + ERROR("portcheck plugin: error in the socket layer. %d ", errno); + return -1; + } + } + + return 0; +} // int connect_wait + +void* portcheck_send(void *arg) +{ + hostlist_t *hl = (hostlist_t*) arg; + + int sockfd; + struct sockaddr_in servaddr; + struct timeval timeout; + struct timeval tv_begin; + struct timeval tv_end; + + if(hl == NULL) + return NULL; + + timeout.tv_sec = floor(portcheck_interval); + timeout.tv_usec = (portcheck_interval - timeout.tv_sec) * 1000000; + + if(hl->tcp) + { + // socket create and verification + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + ERROR("portcheck plugin: creating socket failed."); + } + bzero(&servaddr, sizeof(servaddr)); + + if(hl->ip == NULL) + { + struct addrinfo* addr; + int result = getaddrinfo(hl->host, NULL, NULL, &addr); + if (result != 0) { + WARNING("portcheck plugin: getaddrinfo() dns lookup of %s failed with %d.", hl->host, result); + return NULL; + } + struct sockaddr_in* internet_addr = (struct sockaddr_in*) addr->ai_addr; + + hl->ip = strdup(inet_ntoa(internet_addr->sin_addr)); + if (hl->ip == NULL) { + ERROR("portcheck plugin: Setting ip of `%s' failed with %s", hl->host, STRERRNO); + return NULL; + } + + } + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr(hl->ip); + servaddr.sin_port = htons(hl->port); + + if (gettimeofday(&tv_begin, NULL) < 0) { + ERROR("portcheck plugin: gettimeofday failed: %s", STRERRNO); + hl->error = 1; + return NULL ; + } + + int state = connect_wait(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr), &timeout); + + if (gettimeofday(&tv_end, NULL) < 0) { + ERROR("portcheck plugin: gettimeofday failed: %s", STRERRNO); + hl->error = 1; + return NULL ; + } + + double beg = tv_begin.tv_sec; + double end = tv_end.tv_sec; + + beg += (double)tv_begin.tv_usec / 1000000.0; + end += (double)tv_end.tv_usec / 1000000.0; + + hl->error = 0; + if(!state) + { + //successfully connected + hl->latency_total = end - beg; + } + else if(state == -1) + { + // system error + hl->error = 1; + return NULL ; + } + else // state == 1 timeout + { + hl->latency_total = NAN; + + } + + close(sockfd); + + } + else + { + WARNING("portcheck plugin: UDP not supported yet. "); + ; + } + + return NULL; +} // int portcheck_send + + +static void *portcheck_thread(void *arg) /* {{{ */ +{ + struct timeval tv_begin; + struct timeval tv_end; + struct timespec ts_wait; + struct timespec ts_int; + + int count; + +// c_complain_t complaint = C_COMPLAIN_INIT_STATIC; + + count = 0; + for (hostlist_t *hl = hostlist_head; hl != NULL; hl = hl->next) { + count++; + } + + if (count == 0) { + ERROR("portcheck plugin: No host could be added to portcheck object. Giving up."); + pthread_mutex_lock(&portcheck_lock); + portcheck_thread_error = 1; + pthread_mutex_unlock(&portcheck_lock); + return (void *)-1; + } + + /* Set up `ts_int' */ + { + double temp_sec; + double temp_nsec; + + temp_nsec = modf(portcheck_interval, &temp_sec); + ts_int.tv_sec = (time_t)temp_sec; + ts_int.tv_nsec = (long)(temp_nsec * 1000000000L); + } + + pthread_mutex_lock(&portcheck_lock); + while (portcheck_thread_loop > 0) { + + if (gettimeofday(&tv_begin, NULL) < 0) { + ERROR("portcheck plugin: gettimeofday failed: %s", STRERRNO); + portcheck_thread_error = 1; + break; + } + + pthread_mutex_unlock(&portcheck_lock); + + + int threadMax = 500; + long unsigned int tid[threadMax]; + int i; + + i=0; + for (hostlist_t *hl = hostlist_head; hl != NULL; hl = hl->next) { + + // test in batches + tid[i] = 0; + pthread_create(&tid[i], NULL, portcheck_send, (void *) hl); + if(!tid[i]) + { + ERROR("portcheck plugin: creating thread failed."); + continue; + } + + i++; + // max reacdhed, wait for threads to finish + if(i == threadMax) + { + for (int k = 0; k < threadMax; k++) + { + pthread_join(tid[k], NULL); + } + i=0; + } + + } + // max was not reached, wait for threads to finish + if(i) + { + for (int k = 0; k < i; k++) + pthread_join(tid[k], NULL); + } + + pthread_mutex_lock(&portcheck_lock); + + if (portcheck_thread_loop <= 0) + break; + + if (gettimeofday(&tv_end, NULL) < 0) { + ERROR("portcheck plugin: gettimeofday failed: %s", STRERRNO); + portcheck_thread_error = 1; + break; + } + + /* Calculate the absolute time until which to wait and store it in + * `ts_wait'. */ + time_calc(&ts_wait, &ts_int, &tv_begin, &tv_end); + + pthread_cond_timedwait(&portcheck_cond, &portcheck_lock, &ts_wait); + if (portcheck_thread_loop <= 0) + break; + } /* while (portcheck_thread_loop > 0) */ + + pthread_mutex_unlock(&portcheck_lock); + + return (void *)0; +} /* }}} void *portcheck_thread */ + +static int start_thread(void) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&portcheck_lock); + + if (portcheck_thread_loop != 0) { + pthread_mutex_unlock(&portcheck_lock); + return 0; + } + + portcheck_thread_loop = 1; + portcheck_thread_error = 0; + status = plugin_thread_create(&portcheck_thread_id, portcheck_thread, + /* arg = */ (void *)0, "portcheck"); + if (status != 0) { + portcheck_thread_loop = 0; + ERROR("portcheck plugin: Starting thread failed."); + pthread_mutex_unlock(&portcheck_lock); + return -1; + } + + pthread_mutex_unlock(&portcheck_lock); + return 0; +} /* }}} int start_thread */ + +static int stop_thread(void) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&portcheck_lock); + + if (portcheck_thread_loop == 0) { + pthread_mutex_unlock(&portcheck_lock); + return -1; + } + + portcheck_thread_loop = 0; + pthread_cond_broadcast(&portcheck_cond); + pthread_mutex_unlock(&portcheck_lock); + + status = pthread_join(portcheck_thread_id, /* return = */ NULL); + if (status != 0) { + ERROR("portcheck plugin: Stopportcheck thread failed."); + status = -1; + } + + pthread_mutex_lock(&portcheck_lock); + memset(&portcheck_thread_id, 0, sizeof(portcheck_thread_id)); + portcheck_thread_error = 0; + pthread_mutex_unlock(&portcheck_lock); + + return status; +} /* }}} int stop_thread */ + +static int portcheck_init(void) /* {{{ */ +{ + if (hostlist_head == NULL) { + NOTICE("portcheck plugin: No hosts have been configured."); + return -1; + } + + if (portcheck_timeout > portcheck_interval) { + portcheck_timeout = 0.9 * portcheck_interval; + WARNING("portcheck plugin: Timeout is greater than interval. " + "Will use a timeout of %gs.", + portcheck_timeout); + } + + + return start_thread(); +} /* }}} int portcheck_init */ + +static int config_set_string(const char *name, /* {{{ */ + char **var, const char *value) { + char *tmp; + + tmp = strdup(value); + if (tmp == NULL) { + ERROR("portcheck plugin: Setting `%s' to `%s' failed: strdup failed: %s", name, + value, STRERRNO); + return 1; + } + + if (*var != NULL) + free(*var); + *var = tmp; + return 0; +} /* }}} int config_set_string */ + + +static int portcheck_config_old(const char *key, const char *value) /* {{{ */ +{ + if (strcasecmp(key, "Host") == 0) { + hostlist_t *hl; + char *host; + + hl = malloc(sizeof(*hl)); + if (hl == NULL) { + ERROR("portcheck plugin: malloc failed: %s", STRERRNO); + return 1; + } + + host = strdup(value); + if (host == NULL) { + sfree(hl); + ERROR("portcheck plugin: strdup failed: %s", STRERRNO); + return 1; + } + + hl->host = host; + hl->latency_total = -2.0; + hl->next = hostlist_head; + hostlist_head = hl; + } else if (strcasecmp(key, "AddressFamily") == 0) { + char *af = NULL; + int status = config_set_string(key, &af, value); + if (status != 0) + return status; + +/* if (strncmp(af, "any", 3) == 0) { + portcheck_af = AF_UNSPEC; + } else if (strncmp(af, "ipv4", 4) == 0) { + portcheck_af = AF_INET; + } else if (strncmp(af, "ipv6", 4) == 0) { + portcheck_af = AF_INET6; + } else { + WARNING("portcheck plugin: Ignoring invalid AddressFamily value %s", af); + } + free(af); +*/ +/* } else if (strcasecmp(key, "SourceAddress") == 0) { + int status = config_set_string(key, &portcheck_source, value); + if (status != 0) + return status; + } +#ifdef HAVE_OPING_1_3 + else if (strcasecmp(key, "Device") == 0) { + int status = config_set_string(key, &portcheck_device, value); + if (status != 0) + return status; + } +#endif + else if (strcasecmp(key, "TTL") == 0) { + int ttl = atoi(value); + if ((ttl > 0) && (ttl <= 255)) + portcheck_ttl = ttl; + else + WARNING("portcheck plugin: Ignoring invalid TTL %i.", ttl); + */ + } else if (strcasecmp(key, "Interval") == 0) { + double tmp; + + tmp = atof(value); + if (tmp > 0.0) + portcheck_interval = tmp; + else + WARNING("portcheck plugin: Ignoring invalid interval %g (%s)", tmp, value); + } else if (strcasecmp(key, "Timeout") == 0) { + double tmp; + + tmp = atof(value); + if (tmp > 0.0) + portcheck_timeout = tmp; + else + WARNING("portcheck plugin: Ignoring invalid timeout %g (%s)", tmp, value); +/* } else if (strcasecmp(key, "MaxMissed") == 0) { + portcheck_max_missed = atoi(value); + if (portcheck_max_missed < 0) + INFO("portcheck plugin: MaxMissed < 0, disabled re-resolving of hosts"); + */} else { + return -1; + } + + return 0; +} /* }}} int portcheck_config */ + +static int portcheck_config_add_host(oconfig_item_t *ci) { + + hostlist_t *hl; + char *host; + char *ident; + int port; + bool isTcp = true; + + ident = (char *)malloc(512); + if (ident == NULL) { + ERROR("portcheck plugin: malloc (ident) failed: %s", STRERRNO); + return 1; + } + + host = (char *)malloc(512); + if (host == NULL) { + ERROR("portcheck plugin: malloc (ident) failed: %s", STRERRNO); + return 1; + } + + int status = cf_util_get_string(ci, &ident); + if (status != 0) { + return -1; + } + + hl = malloc(sizeof(*hl)); + if (hl == NULL) { + ERROR("portcheck plugin: malloc failed: %s", STRERRNO); + return 1; + } + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Host", option->key) == 0) + status = cf_util_get_string(option, &host); + else if (strcasecmp("Port", option->key) == 0) + status = cf_util_get_int(option, &port); + else if (strcasecmp("TCP", option->key) == 0) + status = cf_util_get_boolean(option, &isTcp); + else { + WARNING("portcheck plugin: data %s: Option `%s' not allowed here.", ident, + option->key); + status = -1; + } + + if (status != 0) + { + if(host != NULL) sfree(host); + if(ident != NULL) sfree(ident); + sfree(hl); + return -1; + } + + } + + if (host == NULL) { + sfree(hl); + ERROR("portcheck plugin: strdup failed: %s", STRERRNO); + return 1; + } + + hl->host = host; + hl->ident = ident; + hl->port = port; + hl->tcp = isTcp; + hl->ip = NULL; + hl->error=0; + hl->latency_total = -2.0; + hl->next = hostlist_head; + hostlist_head = hl; + + return 0; + +} /* }}} int portcheck_config_add_host() */ + +static int portcheck_config(oconfig_item_t *ci) { + + int status; + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + if (strcasecmp("Host", child->key) == 0 && child->values_num > 0) { + status = portcheck_config_add_host(child); + } + else { + // Process all old key+value pairs + // plugin_register_config("portcheck", portcheck_config, config_keys, config_keys_num); + + for(int ii = 0; i < config_keys_num; i++) { + if( strcasecmp(child->key, config_keys[ii]) ) { + if (child->values_num != 1 ) { + WARNING("portcheck plugin: %s can have only 1 string argument.", child->key); + break; + } + if (child->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("portcheck plugin: %s needs only string argument.", child->key); + break; + } + + status = portcheck_config_old(child->key, child->values[0].value.string); + if(status != 0) + break; + + } + } + } + if(status != 0) + return -1; + + } /* for (ci->children) */ + + return 0; + +} /* int portcheck.config */ + + + +static void submit(const char *host, const char *ident, int port, const char *type, /* {{{ */ + gauge_t value) { + value_list_t vl = VALUE_LIST_INIT; + char str[30]; + sprintf(str, "%d", port); + + vl.values = &(value_t){.gauge = value}; + vl.values_len = 1; + sstrncpy(vl.plugin, "portcheck", sizeof(vl.plugin)); + sstrncpy(vl.type_instance, str, sizeof(vl.type_instance)); + sstrncpy(vl.type, type, sizeof(vl.type)); + sstrncpy(vl.host, ident, sizeof(vl.host)); + + plugin_dispatch_values(&vl); +} /* }}} void portcheck_submit */ + +static int portcheck_read(void) /* {{{ */ +{ + if (portcheck_thread_error != 0) { + ERROR("portcheck plugin: The portcheck thread had a problem. Restarting it."); + + stop_thread(); + + for (hostlist_t *hl = hostlist_head; hl != NULL; hl = hl->next) { + hl->latency_total = -2.0; + } + + start_thread(); + + return -1; + } /* if (portcheck_thread_error != 0) */ + + for (hostlist_t *hl = hostlist_head; hl != NULL; hl = hl->next) /* {{{ */ + { + double latency_total; + + /* Locking here works, because the structure of the linked list is only + * changed during configure and shutdown. */ + pthread_mutex_lock(&portcheck_lock); + + if(hl->error) + { + WARNING("portcheck plugin: error for host %s.", hl->host); + + continue; + } + latency_total = hl->latency_total; + + hl->latency_total = -1.0; + + pthread_mutex_unlock(&portcheck_lock); + + + // Most likely the startup + if (latency_total < 0 && latency_total != NAN) { + WARNING("portcheck plugin: No packages for host %s have been sent.", hl->host); + latency_total = NAN; + } + + submit(hl->host, hl->ident, hl->port, "portcheck", latency_total); + } /* }}} for (hl = hostlist_head; hl != NULL; hl = hl->next) */ + + return 0; +} /* }}} int portcheck_read */ + +static int portcheck_shutdown(void) /* {{{ */ +{ + hostlist_t *hl; + + INFO("portcheck plugin: Shutting down thread."); + if (stop_thread() < 0) + return -1; + + hl = hostlist_head; + while (hl != NULL) { + hostlist_t *hl_next; + + hl_next = hl->next; + + sfree(hl->host); + sfree(hl->ident); + sfree(hl); + + hl = hl_next; + } + + return 0; +} /* }}} int portcheck_shutdown */ + +void module_register(void) { +// plugin_register_config("portcheck", portcheck_config, config_keys, config_keys_num); + plugin_register_complex_config("portcheck", portcheck_config); + plugin_register_init("portcheck", portcheck_init); + plugin_register_read("portcheck", portcheck_read); + plugin_register_shutdown("portcheck", portcheck_shutdown); +} /* void module_register */