Logstash analiza el documento xml que contiene varias entradas de registro

8

Actualmente estoy evaluando si logstash y elasticsearch son útiles para nuestro caso de uso. Lo que tengo es un archivo de registro que contiene múltiples entradas que tiene la forma

<root>
    <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        ...
        <fieldarray>
            <fielda>...</fielda>
            <fielda>...</fielda>
            ...
        </fieldarray>
    </entry>
    <entry>
    ...
    </entry>
    ...
<root>

Cada entryelemento contendría un evento de registro. (Si está interesado, el archivo es en realidad una exportación de registro de trabajo Tempo Timesheets (An Atlassian JIRA Plug-in)).

¿Es posible transformar dicho archivo en múltiples eventos de registro sin escribir mi propio códec?

dualed
fuente

Respuestas:

11

Muy bien, encontré una solución que funciona para mí. El mayor problema con la solución es que el complemento XML no es ... bastante inestable, pero está mal documentado y tiene errores o está mal documentado de forma incorrecta.

TLDR

Línea de comando bash:

gzcat -d file.xml.gz | tr -d "\n\r" | xmllint --format - | logstash -f logstash-csv.conf

Configuración de Logstash:

input {
    stdin {}
}

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
    # multiline filter adds the tag "multiline" only to lines spanning multiple lines
    # We _only_ want those here.
    if "multiline" in [tags] {
        # Add the encoding line here. Could in theory extract this from the
        # first line with a clever filter. Not worth the effort at the moment.
        mutate {
            replace => ["message",'<?xml version="1.0" encoding="UTF-8" ?>%{message}']
        }
        # This filter exports the hierarchy into the field "entry". This will
        # create a very deep structure that elasticsearch does not really like.
        # Which is why I used add_field to flatten it.
        xml {
            target => entry
            source => message
            add_field => {
                fieldx         => "%{[entry][fieldx]}"
                fieldy         => "%{[entry][fieldy]}"
                fieldz         => "%{[entry][fieldz]}"
                # With deeper nested fields, the xml converter actually creates
                # an array containing hashes, which is why you need the [0]
                # -- took me ages to find out.
                fielda         => "%{[entry][fieldarray][0][fielda]}"
                fieldb         => "%{[entry][fieldarray][0][fieldb]}"
                fieldc         => "%{[entry][fieldarray][0][fieldc]}"
            }
        }
        # Remove the intermediate fields before output. "message" contains the
        # original message (XML). You may or may-not want to keep that.
        mutate {
            remove_field => ["message"]
            remove_field => ["entry"]
        }
    }
}

output {
    ...
}

Detallado

Mi solución funciona porque al menos hasta el entrynivel, mi entrada XML es muy uniforme y, por lo tanto, puede manejarse mediante algún tipo de coincidencia de patrones.

Dado que la exportación es básicamente una línea muy larga de XML, y el complemento logstash xml funciona esencialmente solo con campos (léase: columnas en líneas) que contienen datos XML, tuve que cambiar los datos a un formato más útil.

Shell: preparar el archivo

  • gzcat -d file.xml.gz |: Era demasiada información, obviamente puedes omitir eso
  • tr -d "\n\r" |: Eliminar saltos de línea dentro de elementos XML: algunos de los elementos pueden contener saltos de línea como datos de caracteres. El siguiente paso requiere que se eliminen o se codifiquen de alguna manera. Aunque se supone que en este punto tiene todo el código XML en una línea masiva, no importa si este comando elimina cualquier espacio en blanco entre los elementos

  • xmllint --format - |: Formatee el XML con xmllint (viene con libxml)

    Aquí la única línea de espagueti enorme de XML ( <root><entry><fieldx>...</fieldx></entry></root>) tiene el formato correcto:

    <root>
      <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        <fieldarray>
          <fielda>...</fielda>
          <fieldb>...</fieldb>
          ...
        </fieldarray>
      </entry>
      <entry>
        ...
      </entry>
      ...
    </root>
    

Logstash

logstash -f logstash-csv.conf

(Consulte el contenido completo del .confarchivo en la sección TL; DR).

Aquí, el multilinefiltro hace el truco. Puede combinar varias líneas en un solo mensaje de registro. Y es por eso que xmllintfue necesario formatear con :

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
}

Básicamente, esto dice que cada línea con sangría que tiene más de dos espacios (o es </entry>/ xmllint hace sangría con dos espacios por defecto) pertenece a una línea anterior. Esto también significa que los datos de los caracteres no deben contener líneas nuevas (despojadas con trshell) y que el xml debe estar normalizado (xmllint)

dualed
fuente
Hola, ¿lograste hacer que esto funcione? Tengo curiosidad porque tengo una necesidad similar y la solución multilínea junto con la división no funcionó para mí. Gracias por sus comentarios
saber, el
@viz Esto funcionó, pero nunca lo usamos en producción. Multilínea solo funciona si tiene una estructura XML muy regular y la ha formateado primero con sangría (vea la respuesta, sección "preparación del archivo")
fechado el
1

Tuve un caso similar. Para analizar este xml:

<ROOT number="34">
  <EVENTLIST>
    <EVENT name="hey"/>
    <EVENT name="you"/>
  </EVENTLIST>
</ROOT>

Yo uso esta configuración para logstash:

input {
  file {
    path => "/path/events.xml"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "<ROOT"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
    }
  }
}
filter {
  xml {
    source => "message"
    target => "xml_content"
  }
  split {
    field => "xml_content[EVENTLIST]"
  }
  split {
    field => "xml_content[EVENTLIST][EVENT]"
  }
  mutate {
    add_field => { "number" => "%{xml_content[number]}" }
    add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" }
    remove_field => ['xml_content', 'message', 'path']
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

Espero que esto pueda ayudar a alguien. He necesitado mucho tiempo para conseguirlo.

drinor
fuente