1. logstash-input-tcp 可用,但log4j会把对象序列化,在kibana看到的会是乱码
  2. logstash-input-log4j 不可用,官方已废弃该插件, log4j连接不上该插件
  3. logstash-input-redis 可用,配置简洁,有安全保障

本篇以 logstash-input-redis插件配置elk

下载sebp/elk镜像

1
docker pull sebp/elk

启动容器

  • 创建docker-compose.yml文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    version: '3'
    services:
    elk:
    image: sebp/elk
    ports:
    - "5601:5601"
    - "9200:9200"
    - "5044:5044"
    restart: always
  • 设置主机的虚拟内存

    1
    sysctl -w vm.max_map_count=262144
  • 启动容器

    1
    docker-compose up -d

配置Logstash虚拟日志条目

  • 进入容器
    1
    docker exec -ti elk_elk_1 bash

删除/etc/logstash/conf.d/下的文件,并新增log4j.conf, 写入以下内容, 然后重启elk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
input {
redis {
host => "127.0.0.1"
port => 6379
password => "123456"
type => "redis-logs"
data_type => "channel"
key => "logstash-log"
}
}
output {
elasticsearch {
hosts => ["localhost"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
flush_size => 20000
idle_flush_time => 10
sniffing => true
template_overwrite => true
}
}

output 说明 Logstash 会努力攒到 20000 条数据一次性发送出去,但是如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次。

测试

  1. 用任意Redis client连接到redis, 执行 publish logstash-log "debug message !!!!"
  2. 过10秒后可以在kibana界面上看到,debug message !!!!的日志

kibana设置

进去会让设置一个index ,可以设置为上面Logstash配置文件log4j.conf output的elasticsearch中的index前缀就行了,然后在首页就能查询到了

集成到java web项目

以Jedis为例, 配置redis过程略

  • log4j to redis Appender

    依赖 Jedis, 为了从JedisPool取出连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisAppender extends AppenderSkeleton{
private static final String DEFAULT_KEYTYPE = "channel";
private static final String DEFAULT_KEY = "logstash-log";
String host = "127.0.0.1";
int port = 6379;
String password;
String keyType = DEFAULT_KEYTYPE;
String key = DEFAULT_KEY;
int timeout = 2000;

// 连接池设置
private long minEvictableIdleTimeMillis = 60000L;
private long timeBetweenEvictionRunsMillis = 30000L;
private int numTestsPerEvictionRun = -1;
private int maxTotal = 8;
private int maxIdle = 0;
private int minIdle = 0;
private boolean blockWhenExhaused = false;
private String evictionPolicyClassName;
private boolean lifo = false;
private boolean testOnBorrow = false;
private boolean testWhileIdle = false;
private boolean testOnReturn = false;

static private JedisPool jedisPool;

@Override
public void activateOptions() {
super.activateOptions();
JedisPoolConfig poolConfig = new JedisPoolConfig();
if (lifo) {
poolConfig.setLifo(lifo);
}
if (testOnBorrow) {
poolConfig.setTestOnBorrow(testOnBorrow);
}
if (testWhileIdle) {
poolConfig.setTestWhileIdle(testWhileIdle);
}
if (testOnReturn) {
poolConfig.setTestOnReturn(testOnReturn);
}
if (timeBetweenEvictionRunsMillis > 0) {
poolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
}
if (evictionPolicyClassName != null && evictionPolicyClassName.length() > 0) {
poolConfig.setEvictionPolicyClassName(evictionPolicyClassName);
}
if (blockWhenExhaused) {
poolConfig.setBlockWhenExhausted(blockWhenExhaused);
}
if (minIdle > 0) {
poolConfig.setMinIdle(minIdle);
}
if (maxIdle > 0) {
poolConfig.setMaxIdle(maxIdle);
}
if (numTestsPerEvictionRun > 0) {
poolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
}
if (maxTotal != 8) {
poolConfig.setMaxTotal(maxTotal);
}
if (minEvictableIdleTimeMillis > 0) {
poolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
}

if (password != null && password.length() > 0) {
jedisPool = new JedisPool(poolConfig, host, port, timeout, password);
} else {
jedisPool = new JedisPool(poolConfig, host, port, timeout);
}
// 配置连接实验
try {
Jedis jedis = jedisPool.getResource();
jedis.ping();
} catch (Exception e) {
LogLog.error("Redis is can not connected: " + e.getMessage());
}
}

@Override
protected void append(LoggingEvent event) {
Jedis resource = null;
try {
if (jedisPool == null){
return;
}
resource = jedisPool.getResource();
String format = getLayout().format(event);
if (DEFAULT_KEYTYPE.equals(keyType)){
resource.publish(key,format);
}else {
resource.lpush(key,format);
}
}catch (Exception e){

}finally {
if (resource != null){
resource.close();
}
}
}

@Override
public void close() {
if (jedisPool!=null){
jedisPool.destroy();
}
}

@Override
public boolean requiresLayout() {
return true;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getKeyType() {
return keyType;
}

public void setKeyType(String keyType) {
this.keyType = keyType;
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public int getTimeout() {
return timeout;
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

public long getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}

public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}

public long getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}

public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}

public int getNumTestsPerEvictionRun() {
return numTestsPerEvictionRun;
}

public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
this.numTestsPerEvictionRun = numTestsPerEvictionRun;
}

public int getMaxTotal() {
return maxTotal;
}

public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}

public int getMaxIdle() {
return maxIdle;
}

public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}

public int getMinIdle() {
return minIdle;
}

public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}

public boolean isBlockWhenExhaused() {
return blockWhenExhaused;
}

public void setBlockWhenExhaused(boolean blockWhenExhaused) {
this.blockWhenExhaused = blockWhenExhaused;
}

public String getEvictionPolicyClassName() {
return evictionPolicyClassName;
}

public void setEvictionPolicyClassName(String evictionPolicyClassName) {
this.evictionPolicyClassName = evictionPolicyClassName;
}

public boolean isLifo() {
return lifo;
}

public void setLifo(boolean lifo) {
this.lifo = lifo;
}

public boolean isTestOnBorrow() {
return testOnBorrow;
}

public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}

public boolean isTestWhileIdle() {
return testWhileIdle;
}

public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}

public boolean isTestOnReturn() {
return testOnReturn;
}

public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}
}
  • 使用方法
1
2
3
4
5
6
7
8
9
10
11
12
# 在log4j.rootLogger添加logstash
log4j.rootLogger=WARN, Console, RollingFile, logstash

# log4j to redis 配置
log4j.appender.logstash=RedisAppender #上面RedisAppender的类名
log4j.appender.logstash.host=127.0.0.1
log4j.appender.logstash.port=6379
log4j.appender.logstash.password=123456
log4j.appender.logstash.keyType=channel #支持 channe与list
log4j.appender.logstash.key=logstash-log
log4j.appender.logstash.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.logstash.layout.locationInfo=true
  • 官方的json layout, 在pom文件添加以下内容
    1
    2
    3
    4
    5
    <dependency>
    <groupId>net.logstash.log4j</groupId>
    <artifactId>jsonevent-layout</artifactId>
    <version>1.7</version>
    </dependency>
1
2
3
Logger logger = LoggerFactory.getLogger(getClass());
MDC.put("title","用户验证模块"); // 让后续log带上入口信息,详情百度搜索 log4j mdc
logger.debug("用户名验证失败!");

预览结果