Monitorizar tráfico de red en VMware vSphere con Netflow y ELK

El objetivo de la entrada es conocer la funcionalidad Netflow de un switch distribuido de la infraestructura de VMware y utilizar de forma básica una infraestructura con ElasticSearch, Logstash y Kibana (ELK) para guardar y explotar la información generada.

 

Esquema básico

De forma resumida, este es el esquema de la solución que vamos a implantar:

  • Infraestructura VMware vSphere con varias máquinas virtuales, ubicadas en diferentes redes de un switch distribuido compartido por varios servidores ESXi
  • Un servidor con los componentes de ELK Stack (Logstash, Elasticsearch y Kibana)

 

Elementos clave del diseño

Para poder comprender y conocer los diferentes elementos del diseño, su funcionalidad y su funcionamiento vamos a ver una pequeña descripción de cada uno de ellos:

  • Switch Distribuido: es uno de los dos tipos de vSwitch que ofrece VMware en su infraestructura vSphere (junto al vSwitch Standard) Es imprescindible utilizarlo ya que este tiipo de switch es el único que proporciona la funcionalidad Netflow.

  • Netflow: es un protocolo de red desarrollado por Cisco que permite recolectar información sobre el tráfico IP de un elemento de red. Se ha convertido en un estándar y es utilizado por la mayoría de los fabricantes, no solo por Cisco. Con este protocolo podemos obtener información como la dirección IP origen, la dirección IP destino, los puertos utilizados, el interfaz utilizado…
    • Versiones: a lo largo de los años han existido diferentes versiones de Netflow, las más importantes son
      • 1, 2, 3 4: fueron versiones internas de Cisco
      • 5: una de las versiones con mayor éxito, pero limitada a IPv4
      • 6, 7, 8: añaden algunos campos a la versión 5
      • 9: junto a la versión 5, la más utilizada, con soporte IPv4, IPv6, MPLS… (https://tools.ietf.org/html/rfc3954)
      • 10: utilizado para identificar IPFIX, aunque no es una versión de Netflow como tal.

  • IPFIX: es un protocolo de red definido por la IETF, que basado en la versión 9 de Netflow, define el estándar para exportar información sobre los flujos de red en elementos de red. (https://tools.ietf.org/html/rfc7011)

 

  • ELK Stack (o Elastic Stack) es la forma en la que se conoce la infraestructura basada en Elasticsearch, Logstash y Kibana
    • ElasticSearch: es el motor de búsqueda basado en Apache Lucene. En la infraestructura, se encarga de almacenar, analizar y permitir la búsqueda de los datos.
          • Near Realtime (NRT): Elasticsearch is una plataforma cercana al tiempo real (NRT) Esto significa que hay una latencia (normalmente de un segundo) desde el momento en el que se indexa un documento hasta que es accesible.
          • Cluster: es una colección  de uno o más nodos (servidores) encargados de almacenar y los datos y proporcionar las funcionalidades de indexado y búsqueda a través de los nodos. Es identificado por un nombre único (por defecto «elasticsearch») que es utilizado por los nodos para configurar la pertenencia al cluster, por lo que es importante no reutilizar el nombre en diferentes clusters. Es posible utilizar un cluster con un único nodo.
          • Node: es un servidor que forma parte de un cluster, almacena datos y participa en las tareas de indexado y búsqueda del cluster. Como el cluster, es identificado de forma única por un nombre.
          • Index: un índice (index) es una colección de documentos que tienen características similares. Por ejemplo, podemos tener un indice para datos de clientes, otro para datos del catálogo de productos y otro indice para los pedidos. Un índice es identificado por un nombre que es utilizado para las operaciones de búsqueda, actualizaciones y borrado de los documentos. Se pueden definir tantos indices como se quiera en un cluster.
          • Document: un documento es la unidad básica de información que puede ser indexada. Por ejemplo, un documento es un cliente, otro documento es un producto y otro documento es un pedido. El documento está representado por un objeto JSON (JavaScript Object Notation) Un documento se almacena en un índice
          • Shards & Replicas: potencialmente, un índice puede almacenar una gran cantidad de datos que pueden exceder los limites hardware de un nodo. Por ejemplo, un indice puede contener documentos que ocupan 1TB de espacio en disco y puede no ser suficiente la capacidad de un nodo o puede ser lento que lo sirva un único nodo. Para resolver este problema, Elasticsearch proporciona la posibilidad de dividir un índice en múltiples partes, llamadas shards. Cuando se crea un índice, de puede definir el número de shards que se desee. Cada shard, es un «índice» independiente totalmente funcional que puede ser almacenado en cualquier nodo del cluster. Un shard es importante porque:
            • Permite dividir el contenido horizontalmente en varios nodos
            • Permite distribuir o paralelizar las operaciones de un índice en varios shards (y potencialmente en varios nodos) incrementando el rendimiento.

            El mecanismo por el que los shard y los documentos son distribuidos es gestionado por Elasticsearch de forma transparente para el usuario.

            También es importante disponer de un mecanismo por el cual la infraestructura no se vea afectada en caso de que un shard/nodo falle o tenga un problema. Para ello Elasticsearch permite almacenar una o más copias de los shard, lo que se llama replicas. Las replicas son importantes porque:

              • Proporcionan alta disponibilidad en el caso de que un shard/nodo falle. Una réplica nunca se ubica en el mismo nodo que el shard original o primario del que es copia
              • Permite escalar las busquedas ya que pueden ser ejecutadas en paralelo en todas las réplicas

            En resumen, cada indice de puede dividir en múltiples shards. Un indice puede tener cero (no tiene réplicas) o más réplicas. Una vez definido el número de replicas, cada indice tiene primary shards (shad original) y replica shards. (copias de los primary shards) El número de shards y replicas puede ser definido por cada index en el momento que es creado. Después de la creación, se puede cambiar el número de réplicas en cualquier momento, pero no se puede cambiar el número de shards. Por defecto, cada indice tiene 5 primary shards y 1 replica (por lo que en total tendremos 10 shards por índice)

 

    • Logstash: es el componente que permite recolectar datos de forma que puede recoger información de diferentes canales, analizarla  guardarla en el destino que se elija. Aunque inicialmente se desarrollo con el objetivo de recolectar logs, ha ido evolucionando permitiendo trabajar con diferentes tipos de fuentes, permite leer logs de Apache, capturar logs en formato syslog, logs de cortafuegos, netflow… Permite el uso de plugins para aumentar su funcionalidad en los 3 principales aspectos en los que trabaja
      • Input: recogida de datos
      • Filter: procesado de datos
      • Output: almacenado de los datos
    • Kibana: es la herramienta web que permite la visualización y análisis de datos almacenados en índices de Elasticsearch. Los datos se pueden visualizar en un variedad de tipos de gráficos, tablas o mapas.
    • Beats: son agentes que se instalan en los servidores o equipos y que se encargan de recopilar diferentes tipos de datos. Existen diferentes agentes para recolectar logs, eventos, métricas, datos de red…
    • X-Pack: es una extensión de Elastic Stack que proporciona funcionalidades de seguridad, monitorización, alertas, informes. Hay que tener en cuenta que estos componentes tienen que licenciarse. Existen diferentes versiones con diferentes funcionalidades, como se puede ver en esta tabla. En esta entrada no vamos a utilizarlo.

 

Requerimientos

Para realizar la instalación y configuración de la infraestructura necesitamos disponer de:

  • Una infraestructura de VMware vSphere utilizando una licencia vSphere Enterprise Plus en los servidores ESXi
  • Al menos un dvSwitch distribuido creado y configurado
  • Un servidor donde instalaremos ELK Stack
  • Conectividad entre los servidores ESXi y el servidor CentOS 7

 

Configuración del dvSwitch en VMware vSphere

Partimos de un switch distribuido ya creado, con los host, los interfaces físicos y algunos grupos de puertos ya creados y configurados. Para habilitar y configurar el uso de Netflow en un switch distribuido, realizamos básicamente 2 pasos:

  • Configuración del servidor netflow al que vamos a enviar la información
  • Configuración de los grupos de puertos, o puertos que tienen habilitado netflow

 

Configuración del servidor Netflow

Nota: La configuración la voy a realizar desde el cliente «vSphere Client Fling version 3.29.0.7157335» aprovechando que ya dispone de esta funcionalidad.

  • Accedemos al menú del switch distribuido: Settings -> Edit Netflow …

  • Rellenamos los campos:
    • Collector IP address: será la IP del servidor que va a  recibir la información
    • Collector port: puerto de destino
    • Observation Domain ID: información para identificar el switch
    • Switch IP Address: dirección IP que aparecerá como host que envía la información. De esta forma aparecen todos los envíos desde un mismo punto. Si no se indica y se deja en blanco, aparecerá la IP de cada servidor Host
    • Active flow export timeout (Seconds): el tiempo que espera en segundos antes de envía la información de un flujo activo
    • Idle flow export timeout (Seconds): el tiempo que espera en segundos antes de envía la información de un flujo no activo
    • Sampling rate: permite definir el número de paquetes que se eliminan del flujo por cada uno que se captura. Si se pone un valor de 0, indica que todos los paquetes se capturan. Un valor de 1, indica que por cada paquete que se captura se descarta otro. Un valor de 2, indica que por cada paquete que se captura se descartan 2 y así sucesivamente…
    • Process internal flows only: si está habilitado únicamente se procesan los flujos entre máquinas virtuales, no con equipos externos

 

 

Una vez que hemos rellenado los datos y guardados los cambios, tenemos que configurar los puntos en los que se va a capturar el tráfico, que pueden ser:

  • Interfaces Uplink
  • Grupo de puertos
  • Puerto individual

En cualquiera de los 3 casos, accedemos a las propiedades del objeto y habilitamos la monitorización con Netflow:

  • Dvuplink

  • Port Group

  • Port

 

Cuando habilitemos los objetos en los que se va a monitorizar el tráfico tenemos que tener cuidado y conocer nuestra infraestructura para evitar la captura duplicada de los flujos generados por las diferentes máquinas virtuales.

 

 

Instalación de servidor ELK

Vamos a comenzar realizando la instalación de todos los componentes del stack ELK en un único servidor, de forma que el cluster Elasticsearch (que estará formado por un único nodo), Logstash y Kibana se ejecutarán en la misma máquina.

 

Instalación y preparación del sistema operativo

Podemos ver las versiones de Sistema Operativo soportadas en la tabla de soporte

En mi caso voy a utilizar una máquina con CentOS 7.4 con una instalación estándar y básica, con las siguientes características hardware

  • CPU: 4
  • RAM: 10GB
  • Disco duro: 40GB
  • 1 interfaz de red
  • Nombre: SRV-ELK-002
  • Dirección IP: 172.16.1.61/24

 

Instalación de Java

Elasticsearch utiliza Java y requiere al menos Java 8 para poder ejecutarse. En esta tabla, podemos ver las implementaciones de Java soportadas. Se debería de utilizar la misma versión en todos los nodos. Como está soportada, vamos a utilizar la versión de Java de OpenJDK

yum install java-1.8.0-openjdk -y

Comprobamos el acceso a java y la versión instalada:

yum list installed | grep java-1.8.0-openjdk.x86_64
java-1.8.0-openjdk.x86_64             1:1.8.0.151-1.b12.el7_4    @updates

java -version
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

 

Añadir repositorio de Elastic

Añadimos el repositorio de Elastic desde el cual instalaremos los 3 componentes principales del servidor (ElasticSearch, LogStash y Kibana)

  • Importamos la clave pública de los paquetes
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
  • Creamos el archivo /etc/yum.repos.d/elastic.repo con el siguiente contenido
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

De esta forma tenemos disponibles los paquetes de Elasticsearch 6.x para poder instalarlos en nuestro servidor

 

Instalación de Elasticsearch

  • Instalamos el paquete de Elasticsearch.
yum install elasticsearch -y
  • En el momento de escribir esta entrada se instala la versión 6.0.0-1
=======================================================================================================================================================
 Package                               Arch                           Version                          Repository                                 Size
=======================================================================================================================================================
Installing:
 elasticsearch                         noarch                         6.0.0-1                          elasticsearch-6.x                          27 M

Transaction Summary
=======================================================================================================================================================
Install  1 Package
  • Habilitamos el servicio con systemctl
systemctl daemon-reload
systemctl enable elasticsearch

De momento, no arrancamos el servicio, primero vamos a realizar la configuración inicial.

 

Configuración inicial de Elasticsearch

Principales directorios:

  • /usr/share/elasticsearch: es el directirio base de la instalación
  • /usr/share/elasticsearch/bin: contiene los binarios
  • /usr/share/elasticsearch/plugins: directorio donde se ubican los plugins
  • /etc/elasticsearch: contiene los archivos de configuración
  • /etc/sysconfig/elasticsearch: contiene la definición de variables
  • /var/lib/elasticsearch: ubicación donde se almacenan los datos
  • /var/log/elaticsearch: ubicación de los logs

 

Elastisearch por defecto utiliza el directorio /etc/elasticsearch para almacenar los archivos de configuración. El propietario de este directorio y todos los ficheros que se encuentran en el es root:elasticsearch. Además se ha establecido el flag setgid, para que se mantenga esta propiedad en los ficheros y subdirectorios que se vayan creando en /etc/elasticsearch.

El archivo de configuración por defecto es /etc/elasticsearch/elasticsearch.yml (más adelante veremos las modificaciones a realizar en este archivo)

También existe el archivo /etc/sysconfig/elasticsearch en el que se puede definir los siguientes parámetros:

################################
# Elasticsearch
################################

# Elasticsearch home directory
#ES_HOME=/usr/share/elasticsearch

# Elasticsearch Java path
#JAVA_HOME=

# Elasticsearch configuration directory
ES_PATH_CONF=/etc/elasticsearch

# Elasticsearch PID directory
#PID_DIR=/var/run/elasticsearch

# Additional Java OPTS
#ES_JAVA_OPTS=

# Configure restart on package upgrade (true, every other setting will lead to not restarting)
#RESTART_ON_UPGRADE=true

################################
# Elasticsearch service
################################

# SysV init.d
#
# The number of seconds to wait before checking if Elasticsearch started successfully as a daemon process
ES_STARTUP_SLEEP_TIME=5

################################
# System properties
################################

# Specifies the maximum file descriptor number that can be opened by this process
# When using Systemd, this setting is ignored and the LimitNOFILE defined in
# /usr/lib/systemd/system/elasticsearch.service takes precedence
#MAX_OPEN_FILES=65536

# The maximum number of bytes of memory that may be locked into RAM
# Set to "unlimited" if you use the 'bootstrap.memory_lock: true' option
# in elasticsearch.yml.
# When using Systemd, the LimitMEMLOCK property must be set
# in /usr/lib/systemd/system/elasticsearch.service
#MAX_LOCKED_MEMORY=unlimited

# Maximum number of VMA (Virtual Memory Areas) a process can own
# When using Systemd, this setting is ignored and the 'vm.max_map_count'
# property is set at boot time in /usr/lib/sysctl.d/elasticsearch.conf
#MAX_MAP_COUNT=262144

 

Vamos a editar el archivo /etc/elasticsearch/elasticsearch.yml y configurar los siguientes valores:

  • cluster.name: nombre del cluster
  • node.name: nombre del nodo
  • network.host: dirección en la que escucha el servicio
  • http.port: puerto de escucha
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: ELK-002
#


# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: SRV-ELK-002
#

# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 172.16.1.61
#
# Set a custom port for HTTP:
#
http.port: 9200
#

Hay otros parámetros que nos permiten configurar el comportamiento del descubrimiento de otros nodos del cluster o el bloqueo de memoria para no» swappear», pero en mi caso, como prueba de concepto, sólo voy a modificar los 4 parámetros indicados. Podría haber dejado la configuración de los parámetros de escucha por defecto para que sólo se tuviese acceso desde el propio servidor, pero de esta forma me permite también poder hacer consultas externas al API de ElasticSearch.

 

Configuración del sistema

Es necesario configurar algunos parámetros del sistema operativo:

  • Set JVM heap size: por defecto, Elasticsearch le dice a Java que utilice un mínimo y un máximo de 1GB de heap en memoria.
    • Se recomienda establecer un valor mínimo igual al valor máximo
    • Cuanta más memoria disponible, mejor
    • Establecer el máximo a un valor no mayor al 50% de la memoria RAM física

Para configurar este valore editamos el archivo /etc/elasticsearch/jvm.options

## JVM configuration

################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms4g
-Xmx4g

  • Disable swapping: en el caso de utilizar swap de disco es posible que el rendimiento se vea afectado, por lo que es necesario evitarlo a toda costa. Podemos utilizar una de las siguientes 3 opciones:
    • Deshabilitar todos los archivos swap
swapoff -a
    •  Configurar swappiness: establece el balance de uso que hace Linux entre la memoria RAM y la swap de disco
Swappinness 0               RAM 100% –  SWAP 0%
Swappinness 35              RAM 65%  –  SWAP 35%
Swappinness 60              RAM 40%  –  SWAP 60%
Swappinness 100             RAM 0%   –  SWAP 100%

Podemos modificar el valor en el archivo /etc/sysctl.conf

vm.swappiness = 1

 

    • Habilitar bootstrap.memory_lock: permite bloquear  la memoria de un proceso  en memoria. En este caso lo habilitamos en el archivo /etc/elasticsearch/elasticsearch.yml
bootstrap.memory_lock: true

 

  • Increase file descriptors: Elasticsearch utiliza muchos archivos por lo que es necesario que no llegue al límite del número de archivos que puede tener abiertos en el sistema. En el caso de una instalación RPM com hemos hecho nosotros, ya se ha definido el máximo a 65536, en otros casos podríamos definirlo con ulimit.

 

  • Ensure sufficient virtual memory: Elasticsearch utiliza nmapfs para almacenar los indices, esto es, los índices almacenados en el sistema de archivos son mapeados a un archivo en memoria (nmap) por lo que es necesario disponer de la memoria virtual suficiente. Se puede gestionar con el parámetro vm.max_map_count en el archivo /etc/sysctl.conf. Con RPM ya se ha establecido este valor por lo que no es necesario modificarlo.

 

  • Ensure sufficient threads: asegurarnos que el número de hilos que el usuario de Elaticsearch es al menos 2048. Lo podemos definir con ulimit.

 

Por defecto, Elasticsearch asume que estamos trabajando en modo «development». En este modo, si alguna de las 5 configuraciones anteriores es incorrecta, se muestra un warning, pero el servicio arranca correctamente. Si se ha indicado el parámetro «network.host» Elasticsearch asume que se ha pasado a modo «production. En este caso, si alguna de las 5 configuraciones anteriores no es correcta, se muestra una excepción y no arranca el servicio.

 

Permitimos el tráfico  en el puerto 9200 en el cortafuegos del servidor

firewall-cmd --permanent --add-port=9200/tcp
firewall-cmd --reload

Arrancamos el servicio

systemctl start elasticsearch

 

Comprobación del estado de Elasticsearch

Lo primero que vamos a hacer es comprobar que el servicio está activo y funcionando

[root@srv-elk-002 ~]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
   Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2017-11-27 19:01:57 CET; 54s ago
     Docs: http://www.elastic.co
 Main PID: 1599 (java)
   CGroup: /system.slice/elasticsearch.service
           └─1599 /bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+Alwa...

Nov 27 19:01:57 srv-elk-002 systemd[1]: Started Elasticsearch.
Nov 27 19:01:57 srv-elk-002 systemd[1]: Starting Elasticsearch...

 

Utilizando REEST API vamos a comprobar el estado del cluster y de varios de los componentes que lo forman. Para ello podemos utilizar un navegador o desde línea de comandos con curl

  • Si utilizamos un navegador y nos conectamos al puerto 9200 comprobamos que el servidor nos responde (http://172.16.1.61:9200)
{
  "name" : "SRV-ELK-002",
  "cluster_name" : "ELK-002",
  "cluster_uuid" : "OvHuVdRLSd-HHLWBhY2tKQ",
  "version" : {
    "number" : "6.0.0",
    "build_hash" : "8f0685b",
    "build_date" : "2017-11-10T18:41:22.859Z",
    "build_snapshot" : false,
    "lucene_version" : "7.0.1",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

Estado del cluster

  • Comprobamos el estado del cluster accediendo a: http://172.16.1.61:9200/_cat/health?v (Ayuda)
epoch      timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1511805922 19:05:22  ELK-002 green           1         1      0   0    0    0        0             0                  -                100.0%

Podemos ver los principales datos que se nos muestran:

  • epoch y timestamp: indica la hora actual de la consulta
  • cluster: indica el nombre del cluster
  • status: indcia el estado de salud del cluster
    • greeen: todo está correcto (el cluster es totalmente funcional)
    • yellow: todos los datos están disponibles pero algunas de las réplicas no se han ubicado en los nodos (el cluster es totalmente funcional)
    • Red: hay datos no disponibles por alguna razón (el cluster no es totalmente funcional) El cluster puede servir datos de los shards disponibles pero hay que solucionar el problema lo antes posible
  • node.total: número total de nodos
  • node.data: número de nodos que almacenan datos
  • shards: número de shards
  • pri: número de shards primarios

 

Estado de los nodos

  • Comprobamos el estado de los nodos accediendo a: http://172.16.1.61:9200/_cat/nodes?v (Ayuda)
ip          heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.16.1.61           2          50   2    0.00    0.02     0.05 mdi       *      SRV-ELK-002

Podemos ver los principales datos que se nos muestran:

  • ip: indica la dirección ip del nodo
  • heap.percent: indica el porcentaje de heap de memoria utilizado
  • ram.percent: indica el porcentaje de memoria utilizado por el servidor
  • cpu: indica el uso de CPU
  • load_1m, load_5m, load_15m: muestra el indice de carga del servidor para los 3 intervalos de tiempo
  • node.role: indica el rol delservidor
    • Master eligible node (m)
    • Data node (d)
    • Ingest node (i)
    • Coordinating node only (-)
  • master: si es nodo master o no
    • Elected master (*)
    • Not elected master (-)
  • name: nombre del nodo

 

Estado de los índices

  • Comprobamos el estado de los nodos accediendo a: http://172.16.1.61:9200/_cat/indices?v (Ayuda)
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size

Podemos ver los principales datos que se nos muestran:

  • health: salud del índice (green, yellow, red)
  • status: estado
  • index: nombre
  • uuid: identificador único
  • pri: número de primary shards
  • rep: número de replica shards
  • docs.count: número de documentos del índice
  • docs.deleted: número de documentos eliminados
  • store.size: tamaño del índice
  • pri.store.size: tamaño de los shards primarios

En estos momentos no tenemos ningún índice, por lo que no se nos muestra ningún dato. Más adelante veremos los datos mostrados cuando tengamos información almacenada en el cluster.

 

 

Instalación de Logstash

 

Ahora toca el turno de Logstash

  • Instalamos el paquete
yum install logstash -y
  • En este momento se instala la versión: 6.0.0-1
=======================================================================================================================================================
 Package                           Arch                            Version                            Repository                                  Size
=======================================================================================================================================================
Installing:
 logstash                          noarch                          1:6.0.0-1                          elasticsearch-6.x                          108 M

Transaction Summary
=======================================================================================================================================================

 

  • Habilitamos el servicio
systemctl enable logstash

 

Configuración de LogStash

Principales directorios:

  • /usr/share/logstash: directorio home
  • /usr/share/logstash/bin: directorio de binarios
  • /usr/share/logstash/plugins: archivos de plugins
  • /etc/logstash: archivos de configuración
  • /etc/logstash/conf.d: configuración de loas pipelines
  • /var/log/logstash: directorio de logs
  • /var/lib/logstash: directorio donde se guardan datos persistentes en el caso de que sea necesario

 

Los archivos de configuración principales que se ubican en /etc/logstash son:

  • logstash.yml: configuración del servicio
  • jvm.options: opciones de la máquina virtual de java
  • startup.options: opciones de arranque del servicio Linux

Además tenemos los archivos de configuración de las pipelines, que veremos más adelante y que se guadan en el directorio /etc/logstash/conf.d

 

  • Para esta prueba de concepto únicamente vamos a modificar el archivo logstash.yml indicando el nombre del nodo y la dirección y puerto de escucha
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
node.name: SRV-ELK-002

# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
http.host: "172.16.1.61"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
http.port: 9600

Este es el funcionamiento básico de Logstash:

Configuramos:

  • Inputs: la entrada de datos, de que tipo, por que puerto… Algunas de las más comunes son:
  • Filters: analizamos los datos y decidimos que hacer con ellos según su contenido. Algunos de los filtros más utilizados:
  • Outputs: enviamos los datos al servicio o destino correspondiente. Los más comunes

 

 

Definición de la recolección Netflow

Hasta la versión 6 de Logstash, cada instancia de logstash sólo era capaz de manejar una pipeline, por lo que si se quería tener un servidor que recogiese y analizase diferentes tipos de datos, teníamos dos opciones:

  • Ejecutar diferentes procesos, cada uno con diferentes archivos de configuración
  • Definir en los archivos de configuración la lógica para poder

Con la versión 6, tenemos la opción de definir diferentes pipelines y que sean ejecutadas por el mismo proceso de logstash. Vamos a utilizar esta método para generar el pipeline de forma que en el futuro

  • Creamos el archivo /etc/logstash/pipelines.yml con el siguiente contenido
- pipeline.id: vsphere-netflow
  path.config: "/etc/logstash/conf.d/vsphere-netflow/*.conf"

Definimos el identificador del pipeline y la ruta donde se encuentran los archivos de configuración que vamos a utilizar.

  • Editamos el archivo /ect/logstash/logstash.yml y comentamos la siguiente línea
#path.config: /etc/logstash/conf.d/*.conf

 

Ahora vamos con los ficheros que van a definir el comportamiento del pipeline. Vamos a separar cada uno de los pasos en diferentes archivos para facilitar el trabajo, por lo que vamos a crear los siguientes archivos en la carpeta /etc/logstash/conf.d/vsphere-netflow:

  • 10-vsphere-netflow-input.conf
  • 20-vsphere-netflow-filter.conf
  • 30-vsphere-netflow-output.conf

Vamos a revisar ahora el contenido de cada uno de los archivos

  • 10-vsphere-netflow-input.conf
input {
    udp {
        codec => netflow {
            versions => [10]
        }
        port => 2055
        type => ipfix
    }
}

En este archivo indicamos la definición de la entrada de datos:

  • Utilizamos el protocolo udp
  • Utilizamos la versión 10 del codec netflow
  • Utilizamos el puerto 2055 para recibir la información
  • Indicamos el tipo de información, marcándola como ipfix

 

  • 20-vsphere-netflow-filter.conf
filter {

}

En este punto vamos a dejar este archivo vacío para más adelante volver a él y verlo en más detalle. De esta forma no realizaremos ningún proceso de los datos recibidos, tal y como nos llegan, los guardaremos

 

  • 30-vsphere-netflow-output.conf
output {
   elasticsearch {
      hosts => [ "172.16.1.61:9200" ]
      index => "logstash-vsphere-netflow-%{+YYYY.MM.dd}"
   }
}

Aquí definimos lo siguiente:

  • Vamos a guardar la información en elasticsearch
  • El host y el puerto al que vamos a enviar la información
  • El nombre del índice en el que guardar la información, que comenzará con una parte fija «logstash-vsphere-netflow-» y se le añadirá la fecha en formato YYYY.MM.dd de forma que por cada día tendremos un índice diferente.

 

Arranque del servicio

  • Modificamos la configuración del cortafuegos para escuchar en el puerto 2055
firewall-cmd --permanent --add-port=2055/udp
firewall-cmd --reload
  • Arrancamos el servicio
systemctl daemon-reload
systemctl enable logstash
systemctl start logstash

 

Comprobación del funcionamiento de Logstash

  • Primero, comprobamos que el servicio ha arrancado correctamente
[root@srv-elk-002 conf.d]# systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
   Active: active (running) since Tue 2017-11-28 10:37:21 CET; 13s ago
 Main PID: 1383 (java)
   CGroup: /system.slice/logstash.service
           └─1383 /bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+D...

Nov 28 10:37:21 srv-elk-002 systemd[1]: Started logstash.
Nov 28 10:37:21 srv-elk-002 systemd[1]: Starting logstash...
  • Ahora vamos a comprobar que se ha creado el índice en elasticsearch consultando «http://172.16.1.61:9200/_cat/indices?v»
health status index                               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-vsphere-netflow-2017.11.28 M0lc6AYDRDmeRFUkNcQ1VA   5   1          4            0     45.9kb         45.9kb

Como podemos ver, tenemos un indice disponible y podemos ver la información relacionada con él.

Si no conseguimos ver el índice, es importante revisar los archivos de logs, ya que nos indicarán si hemos cometido algún error en la definición de los archivos .conf (es importante cumplir con el formato de YAML)

 

Instalación de Kibana

En este punto ya tenemos el recolector (logstash) recogiendo y guardando los datos (en elasticsearch) Ahora vamos a proceder a la instalación y configuración de Kibana para visualizarlos.

  • Instalación del paquete
yum install kibana -y
  • Se instala la versión 6.0.0-1
=======================================================================================================================================================
 Package                          Arch                             Version                           Repository                                   Size
=======================================================================================================================================================
Installing:
 kibana                           x86_64                           6.0.0-1                           elasticsearch-6.x                            61 M

Transaction Summary
=======================================================================================================================================================

Los directorios de instalación y configuración son los siguientes:

  • /usr/share/kibana: directorio home
  • /usr/share/kibana/bin: binarios
  • /usr/share/kibana/optimize: donde se ubican los archivos de ciertas operaciones administrativas (como la instalación de plugins)
  • /usr/share/kibana/plugins: directorio de plugins
  • /etc/kibana: archivos de configuración
  • /var/lib/kibana: ubicación de los datos que son necesarios escribir en disco

 

Configuración de Kibana

  • Editamos el archivo /etc/kibana/kibana.yml y modificamos los siguientes valores:
    • server.port
    • server.host
    • server.name
    • elasticsearch.url
# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "172.16.1.61"

# The Kibana server's name.  This is used for display purposes.
#server.name: "SRV-ELK-002"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://172.16.1.61:9200"

 

  • Añadimos el puerto de acceso a la configuración del cortafuegos
firewall-cmd --permanent --add-port=5601/tcp
firewall-cmd --reload

 

  • Habilitamos el servicio
systemctl daemon-reload
systemctl enable kibana

 

  • Arrancamos el servicio
systemctl start kibana
  • Accedemos con un navegador a la dirección y puerto configurados: http://172.16.1.61:5601/status y comprobamos el estado del servidor

 

 

Acceso a los datos

El siguiente paso es configurar Kibana para poder acceder al índice que estamos generando con logstash en elasticsearch. Para ello, seguimos estos pasos

  • Accedemos al interfaz web de Kibana http://172.16.1.61:5601
  • Automáticamente se nos mostrará la siguiente pantalla, ya que no tenemos ningún índice configurado

  • Indicamos un patrón para el índice que hemos creado, en mi caso «logstash-vsphere-netflow-*»
  • Seleccionamos el campo que se va a utilizar como referencia temporal. Podemos utilizar uno de los campos recibidos.

  • Pinchamos en Create
  • Se nos muestra una ventana con los campos que ha detectado la infraestructura, que como podemos comprobar son los campos del paquete netflow

 

Para ver el contenido de los datos recibidos, vamos a pinchar en el enlace Discover y comprobamos que estamos recibiendo datos y que podemos visualizarlos

 

 

 

Análisis de los datos Netflow recibidos

Antes de comenzar a explotar los datos y crear visualizaciones y dashboards vamos a ver en detalle los datos recibidos y analizar si podemos mejorar su calidad.

Cada una de las líneas que vemos, representa un flujo de informacion enviado por Netflow

Si lo expandimos, podemos ver de forma más clara, los campos y valores de ese flujo:

Vamos a revisar la información mostrada en alguno de los campos más importantes

  • timestamp: es la fecha en la que se recibe el paquete por logstash
  • _index: índice al que pertenece el documento
  • host: dirección IP del elemento que envía el tráfico (en este caso del dvSwitch)

  • netflow.destinationIPv4address: dirección IP de destino del flujo
  • netflow.destinationTransportPort: puerto de destino del flujo
  • netflow.egressInterface: interfaz en el que se ha detectado la salida del flujo. En nuestro caso podemos ver el elemento asociado a ese puerto en la configuración del dvSwitch. Por ejemplo, el puerto 18 corresponde con:

  • netflow.flowDirection: dirección del tráfico desde el punto en el que se observa: (0: ingress – 1: egress)
  • netflow.flowEndMilliseconds: fecha en la que finaliza el flujo
  • netflow.flowEndReason: razón por la que termina el flujo. Pude tener diferentes valores (1: idle timeout – 2: active timeout – 3: end flow – 4: forced end – 5: lack of resources)
  • netflow.flowStartMilliseconds: fecha en la que inicia el flujo
  • netflow.ingressInterface: intefaz de entrada del flujo
  • netflow.octedDeltaCount: número de octetos (bytes) desde el último informe del flujo
  • netflow.packetDeltaCount: número de paquetes desde el último informe del flujo
  • netflow.protocolIdentifier: identificador del protocolo IP definido por la IANA (1: ICMP – 6: TCP – 17: UDP  …)
  • netflow.sourceIPv4Address: dirección IP del origen del flujo
  • netflow.sourceTransportPort: puerto de origen del flujo
  • netflow.version: versión de netflow, en este caso la versión 10 (IPFIX)

Con esta información y el ejemplo de la imagen anterior, podemos identificar que el flujo corresponde con la respuesta desde la máquina 192.168.1.11 a la máquina 192.168.1.13 ante una consulta DNS (protocolIdentifier=17, sourceTransportPort=53). El flujo ha estado compuesto por 2 paquetes (packetDeltaCount) con un tamaño de 218 bytes (octetDeltaCount=218)

Es importante entender que Netflow no envía un paquete por cada paquete de comunicaciones que visualiza. Agrupa los paquetes en flujos, por lo que si, por ejemplo, queremos ver el volumen de tráfico de un flujo, tenemos que fijarnos en el campo octetDeltaCount.

Viendo esta información, se me ocurre un par de formas para mejorar la información que visualizamos

  • Visualizar el tipo de protocolo (TCP, UDP, ICMP…) y el servicio asociado a un puerto concreto (DNS, SSH…) sin tener que conocer los puertos correspondientes para cada protocolo y servicio.
  • Obtener el nombre de los equipos involucrados en el flujo y no solo las direcciones IP, para poder facilitar la visualización de los datos

 

Para esto vamos a volver a la configuración del pipeline en Logstash y configurar un filtro que nos ayude en estas tareas.

 

Filtrado de datos con Logstash

Para poder añadir ese tipo de información, vamos a trabajar con los filtros de logstash, utilizando varios de los plugins que tenemos disponibles (Filter plugins)

  • Editamos el archivo «/etc/logstash/conf.d/vsphere-netflow/20-vsphere-netflow-filter.conf.old» y añadimos lo siguiente
filter {
   # Comprobamos que existe el campo "protocolIdentifier"
   if [netflow][protocolIdentifier] {
        # Buscamos el nombre asociado al número del protocolo
        translate {
            dictionary_path => "/etc/logstash/dictionaries/iana_protocol_numbers.yml"
            field => "[netflow][protocolIdentifier]"
            destination => "[netflow][protocolIdentifierName]"
            fallback => "UNKNOWN"
        }

        # Para los dos principales protocolos (TCP y UDP) buscamos el nombre del servicio
        # TCP
        if [netflow][protocolIdentifier] == 6 {
            # Puerto origen
            if [netflow][sourceTransportPort] {
                translate {
                    dictionary_path => "/etc/logstash/dictionaries/iana_service_names_tcp.yml"
                    field => "[netflow][sourceTransportPort]"
                    destination => "[netflow][sourceTransportPortName]"
                    fallback => "UNKNOWN"
                }
            }
            # Puerto destino
            if [netflow][destinationTransportPort] {
                translate {
                    dictionary_path => "/etc/logstash/dictionaries/iana_service_names_tcp.yml"
                    field => "[netflow][destinationTransportPort]"
                    destination => "[netflow][destinationTransportPortName]"
                    fallback => "UNKNOWN"
                }
            }
        }
        # UDP
        else if [netflow][protocolIdentifier] == 17 {
            # Puerto origen
            if [netflow][sourceTransportPort] {
                translate {
                    dictionary_path => "/etc/logstash/dictionaries/iana_service_names_udp.yml"
                    field => "[netflow][sourceTransportPort]"
                    destination => "[netflow][sourceTransportPortName]"
                    fallback => "UNKNOWN"
                }
            }
            # Puerto destino
            if [netflow][destinationTransportPort] {
                translate {
                    dictionary_path => "/etc/logstash/dictionaries/iana_service_names_udp.yml"
                    field => "[netflow][destinationTransportPort]"
                    destination => "[netflow][destinationTransportPortName]"
                    fallback => "UNKNOWN"
                }
            }
        }
    }


    # Resolvemos los nombres de las direcciones IP
    # Origen
    if [netflow][sourceIPv4Address] {
       mutate {
          copy => {"[netflow][sourceIPv4Address]" => "[netflow][sourceAddressName]"}
       }
       dns {
          action => "replace"
          reverse => ["[netflow][sourceAddressName]"]
          hit_cache_size => 4096
          hit_cache_ttl => 900
          failed_cache_size => 512
          failed_cache_ttl => 900
       }
    }

    # Destino
    if [netflow][destinationIPv4Address] {
       mutate {
          copy => {"[netflow][destinationIPv4Address]" => "[netflow][destinationAddressName]"}
       }
       dns {
          action => "replace"
          reverse => ["[netflow][destinationAddressName]"]
          hit_cache_size => 4096
          hit_cache_ttl => 900
          failed_cache_size => 512
          failed_cache_ttl => 900
       }
    }
}
  • Explicación del contenido del archivos:
    • Resolver los nombres de los protocolos y servicios
      • Utilizamos condicionales (if) para comprobar la existencia de un campo o de su valor
      • Utilizamos el plugin translate que nos permite buscar y reemplazar valores en archivos YAML, JSON o CSV
      • Utilizamos varios «diccionarios» para obtener los nombres de los protocolos y servicios
      • Creamos nuevos campos (protocolIdentifierName, sourceTransportPortName y destinationTransportPortName) con esta información
    • Resolver los nombres de las direcciones IP origen y destino (esta configuración puede provocar una sobrecarga en el proceso y hay que utilizarla con cuidado, ya que por cada flujo puede realizar 2 consultas de nombres.
      • Creamos un nuevo campo llamado destinationAddressName como copia del campo destinationIPv4Address con el plugin mutate
      • Con el plugin dns reemplazamos el contenido de destinationAddressName con la resolución inversa que nos devuelve el nombre fqdn
      • Optimizamos las consultas con la configuración de una caché

 

  • Vamos a utilizar 3 archivos como «diccionarios» donde tenemos la relación de los números y los nombres relacionados con los protocolos y los servicios. Copiamos a la carpeta /etc/logstash/dictionaries los archivos
    • iana_protocol_numbers.yml
    • iana_service_names_tcp.yml
    • iana_service_names_udp.yml

 

Para comprobar que estamos recibiendo la información con los nuevos campos, volvemos a Kibana y podemos:

  • Comprobar en el índice (Management -> Index Patterns) que al refrescar vemos los nuevos campos

  • Ir directamente a Discover y comprobar en los datos recibidos los nuevos campos

 

Ejemplo

Vamos a ver con un ejemplo sencillo como visualizamos el tráfico generado por una máquina en concreto. Para ello vamos a realizar una sencilla prueba. Vamos a realizar un ping enviando 82 paquetes desde una máquina de laboratorio a uno de los servidores DNS de google:

  • IP origen: 192.168.1.31
  • IP destino: 8.8.4.4
  • Protocolo: ICMP
  • Paquetes: 82
[root@srv-elk-001 ~]# ping -c 82 8.8.4.4
PING 8.8.4.4 (8.8.4.4) 56(84) bytes of data.
64 bytes from 8.8.4.4: icmp_seq=1 ttl=55 time=41.9 ms
64 bytes from 8.8.4.4: icmp_seq=2 ttl=55 time=42.8 ms
64 bytes from 8.8.4.4: icmp_seq=3 ttl=55 time=43.0 ms
64 bytes from 8.8.4.4: icmp_seq=4 ttl=55 time=41.6 ms
...
64 bytes from 8.8.4.4: icmp_seq=80 ttl=55 time=41.1 ms
64 bytes from 8.8.4.4: icmp_seq=81 ttl=55 time=42.0 ms
64 bytes from 8.8.4.4: icmp_seq=82 ttl=55 time=40.8 ms

--- 8.8.4.4 ping statistics ---
82 packets transmitted, 82 received, 0% packet loss, time 81113ms
rtt min/avg/max/mdev = 40.340/42.558/79.187/4.199 ms


 

Lo que vemos en Kibana es lo siguiente :

 

  • Vemos 2 flujos de datos para un total de 82 paquetes
    • El primero con 60 paquetes enviados
    • El segundo con 22 paquetes enviados
  • Podemos ver que todos los campos corresponden con el tráfico enviado
    • Dirección IP origen: 192.168.1.31
    • Dirección IP destino: 8.8.4.4
    • Resolución de los nombres de origen y destino
    • El tipo de protocolo como ICMP

 

 

Creando visualizaciones y dashboards

Al tener ya los datos disponibles, el último paso que nos queda es preparar gráficos y tablas que nos permitan visualizar aquellos datos que consideremos más importantes sobre los flujos recogidos.

Vamos a ver por ejemplo, como visualizar el tipo de protocolos que se están utilizando en las comunicaciones, tanto por número de paquetes, como por bytes como por número de flujos registrados.

  • Accedemos al apartado Visualize

  • Aquí vamos a crear los diferentes tipos de gráficos que vamos a poder visualizar en los Dashboards
  • Pinchamos en el símbolo [+]
  • Seleccionamos el tipo de visualización, por ejemplo, vamos a comenzar con un gráfico de tipo tarta

  • Seleccionamos el índice que queremos utilizar

  • Primero desmarcamos en el apartado Options, la opción «Donut»
  • Vamos a comenzar con el gráfico que muestra el número de flujos
  • En Metrics, dejamos seleccionada el valor Count en Aggregation
  • Pinchamos en Split Slices y seleccionamos Term y a continuación el campo netflow.ProtocolIdentifierName

 

 

  • Pinchamos en Save y le asignamos un nombre
  • Repetimos los pasos, pero en este caso, vamos a visualizar el número de bytes asociados a cada protocolo
    • En este caso, seleccionamos en el apartado Metrics: Sum -> netflow.octedDeltaCount -> Bytes
    • En Buckets: Split Slices -> Terms -> netflow.protocolIdentifierName.keyword -> metric:Bytes

 

  • Vamos creando el resto de gráficos que queremos utilizar

Una vez que hemos creado los diferentes gráficos de diferentes tipos, como por ejemplo:

  •  Accedemos a Dashboard

  • Vamos añadiendo los diferentes gráficos

  • Hasta ajustarlos al diseño de Dashboard que queremos

  • Para finalizar guardamos el dashboard y comprobamos como se visualiza

De forma similar, podemos proceder para visualizar otros datos, como pueden ser:

  • Servicios utilizados
  • Tráfico por clientes
  • Tráfico por interfaz

 

Siguientes pasos

En la siguiente entrada veremos como aprovechar la infraestructura que hemos creado con ELK para recopilar la información de Syslog de la infraestructura vSphere (vCenter y servidores ESXi) realizando funciones similares a vRealize Log Insight o SexiLog.

 

 

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Este sitio usa Akismet para reducir el spam. Aprende cómo se procesan los datos de tus comentarios.