Log shipping through ELK

A common devops task is build logging pipeline with ELK stack (Elasticsearch, Logstash, Kibana). Suppose the application is written in Java and currently use log4j’s RollingFileAppender to generate log files locally. We can use log4j’s socket appender to write to Logstash, which further pushes the log stream to Elasticsearch cluster.

In this model, failure to push a log line results in the following in log4j output:

2018-09-06 11:05:39,778,ERROR,stderr - [AsyncAppender-Dispatcher-Thread-672] log4j:WARN Detected problem with connection: java.net.SocketException: Broken pipe (Write failed)

The logstash log displays the socket exception as well:

[2018-09-06T10:12:21,935][DEBUG][logstash.inputs.log4j ] Accepted connection {:client=>"192.168.111.56:58118", :server=>"0.0.0.0:4560"}
[2018-09-06T10:12:21,963][DEBUG][logstash.pipeline ] filter received {"event"=>{"method"=>"?", "thread"=>"676774870@qtp-1804103302-6", "message"=>"Unable to resolve session ID from SessionKey [org.apache.shiro.web.session.mgt.WebSessionKey@71da9a4]. Returning null to indicate a session could not be found.", "priority"=>"DEBUG", "type"=>"log4j", "path"=>"org.apache.shiro.session.mgt.DefaultSessionManager", "@timestamp"=>2018-09-06T15:12:21.950Z, "file"=>"?:?", "@version"=>"1", "host"=>"192.168.111.56:58118", "logger_name"=>"org.apache.shiro.session.mgt.DefaultSessionManager", "class"=>"?", "timestamp"=>1536246741950}}
[2018-09-06T10:12:21,963][DEBUG][logstash.pipeline ] output received {"event"=>{"method"=>"?", "thread"=>"676774870@qtp-1804103302-6", "message"=>"Unable to resolve session ID from SessionKey [org.apache.shiro.web.session.mgt.WebSessionKey@71da9a4]. Returning null to indicate a session could not be found.", "priority"=>"DEBUG", "type"=>"log4j", "path"=>"org.apache.shiro.session.mgt.DefaultSessionManager", "@timestamp"=>2018-09-06T15:12:21.950Z, "file"=>"?:?", "@version"=>"1", "host"=>"192.168.111.56:58118", "logger_name"=>"org.apache.shiro.session.mgt.DefaultSessionManager", "class"=>"?", "timestamp"=>1536246741950}}
[2018-09-06T10:12:22,041][DEBUG][logstash.inputs.log4j ] Closing connection {:client=>"192.168.111.56:58118", :exception=>java.io.InvalidObjectException: Object type java.util.Hashtable is not allowed.}

The troubleshooting isn’t very straightforward. So its alternative is preferred. The alternative is to keep the existing RollingFileAppender as well as the local log files, but use a filebeat agent for each application node. Here is the diagram:

Log Analysis Pipelines in Elastic Stack | by Thejan Rupasinghe | Medium
ELK Log Shipping Pipeline
  1. Filebeat is a very light agent to be installed with the application on the same server or container.
  2. Streams of text information congregate to logstash nodes. These log stash service pushes the converged stream to ElasticSearch cluster
  3. ElasticSearch cluster ingest the stream for indexing
  4. Kibana is responsible for viewing.

This is an example architecture with many potential variations. For example, log stash service may be deployed on the same hosts with ElasticSearch cluster. If there aren’t many application nodes, filebeat may directly push its output to ElasticSearch. This architecture provides a lot of flexibility and scalability.

With log4j format, two challenges to address are:

  • identifying multi-line entry in the log
  • mapping sections of a log entry into fields in Elastisearch

For example, the following log entry reflects both challenges above:

2018-09-06 11:52:18,022,DEBUG,service.dataportal.web.retrieve - [687285925@qtp-564051174-3350] creating streaming output request 9868594 and requid=fb9a2cad-e24b-4eb8-ad62-b027184e7b9b
2018-09-06 11:52:18,054,ERROR,org.glassfish.jersey.server.ServerRuntime$Responder - [687285925@qtp-564051174-3350] An I/O error has occurred while writing a re
sponse message entity to the container output stream.
org.glassfish.jersey.server.internal.process.MappableException: org.mortbay.jetty.EofException
        at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:91)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
        at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1135)
        at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:662)
        at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:395)
        at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:385)
        at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:280)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
        at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:370)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:389)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:342)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:229)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:390)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:765)
        at org.mortbay.jetty.handler.HandlerList.handle(HandlerList.java:49)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.handler.RequestLogHandler.handle(RequestLogHandler.java:49)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:926)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
Caused by: org.mortbay.jetty.EofException
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:789)
        at org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:568)
        at org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1010)
        at org.glassfish.jersey.servlet.internal.ResponseWriter$NonCloseableOutputStreamWrapper.flush(ResponseWriter.java:330)
        at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:287)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$UnCloseableOutputStream.flush(WriterInterceptorExecutor.java:305)
        at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:79)
        at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:61)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:266)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:251)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
        at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:109)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
        at org.glassfish.jersey.spi.ContentEncoder.aroundWriteTo(ContentEncoder.java:137)
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
        at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:85)
        ... 40 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at org.mortbay.io.ByteArrayBuffer.writeTo(ByteArrayBuffer.java:368)
        at org.mortbay.io.bio.StreamEndPoint.flush(StreamEndPoint.java:122)
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:723)
        ... 58 more

To address multi-line entry we need to tell filebeat how to identify the start of a line through its multiline.pattern configuration option:

- type: log
  enabled: true
  paths:
  - /var/log/dhunch/app.log
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}'
  multiline.negate: true
  multiline.match: after
  fields:
    product:
      name: dhunchapp
    log:
      type: app_log
      content: diagnostic

When filebeat pushes log entries to logstash, there will be an additional “time” column added, which reflects the time the log is ingested. The entire log4j message line is all put in a single field called “message”, this message field contains the timestamp from application, the thread, class, logging level and the actual diagnostic message. These are not mapped to separate sections making it difficult to search in Elasticsearch.

To address section mapping, the configuration is made on the logstash side. We will take advantage of a plugin called grok filter. The function of grok filter here, is to parse the original “message” field, map different sections into separate columns, respectively called logtime, loglevel, logclass, logthread and logmsg. The grok filter expression is as below:

filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:logtime},%{LOGLEVEL:loglevel},%{NOTSPACE:logclass} - \[%{DATA:logthread}\] %{GREEDYDATA:logmsg}" }
  }
  date {
    match => [ "logtime" , "yyyy-MM-dd HH:mm:ss,SSS" ]
    timezone => "America/Chicago"
    target => "@timestamp"
  }
  mutate {
    replace => [ "message" , "%{logmsg}" ]
  }
  mutate {
    remove_field => [ "logmsg" ]
  }
}

The date plugin indicates to Elasticsearch to treat this field as datetime with specified time zone. The mutate plugin below essentially renames the “logmsg” column to “message”, which allows Elasticsearch to understand this field as log message.