nginx+lua+kafka实现日志统一收集汇总

一:场景描述
对于线上大流量服务或者需要上报日志的nginx服务,每天会产生大量的日志,这些日志非常有价值。可用于计数上报、用户行为分析、接口质量、性能监控等需求。但传统nginx记录日志的方式数据会散落在各自nginx上,而且大流量日志本身对磁盘也是一种冲击。
我们需要把这部分nginx日志统一收集汇总起来,收集过程和结果需要满足如下需求:
支持不同业务获取数据,如监控业务,数据分析统计业务,推荐业务等。
数据实时性
高性能保证

二:技术方案
得益于openresty和kafka的高性能,我们可以非常轻量高效的实现当前需求,架构如下:

方案描述:
1:线上请求打向nginx后,使用lua完成日志整理:如统一日志格式,过滤无效请求,分组等。
2:根据不同业务的nginx日志,划分不同的topic。
3:lua实现producter异步发送到kafka集群。
4:对不同日志感兴趣的业务组实时消费获取日志数据。

三:相关技术
openresty: http://openresty.org
kafka: http://kafka.apache.org
lua-resty-kafka: https://github.com/doujiang24/lua-resty-kafka

四:安装配置
为了简单直接,我们采用单机形式配置部署,集群情况类似。
1)打包openresty:

# wget https://aur.archlinux.org/cgit/aur.git/snapshot/openresty.tar.gz
# tar xzvf openresty.tar.gz
# cd openresty
# makepkg  --skippgpcheck
# pacman -U 

2)安装lua-resty-kafka

#下载lua-resty-kafka:  
wget https://github.com/doujiang24/lua-resty-kafka/archive/v0.06.zip
unzip v0.06.zip
#拷贝lua-resty-kafka到openresty  
mkdir /opt/openresty/lualib/kafka  
cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/  

3):安装kafka

cd /srv/
wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
tar xvf kafka_2.11-0.10.2.1.tgz
# 开启单机zookeeper  
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 &  
# 绑定broker ip,必须绑定  
#在config/servier.properties下修改host.name  
#host.name={your_server_ip}  
# 启动kafka服务  
nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 &  
# 创建测试topic  
sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1  

五:配置运行
编辑/etc/nginx/conf/nginx.conf 实现kafka记录nginx日志功能,源码如下:

worker_processes  12;  
   
events {  
    use epoll;  
    worker_connections  65535;  
}  
   
http {  
    include       mime.types;  
    default_type  application/octet-stream;  
    sendfile        on;  
    keepalive_timeout  0;  
    gzip on;  
    gzip_min_length  1k;  
    gzip_buffers     4 8k;  
    gzip_http_version 1.1;  
    gzip_types       text/plain application/x-javascript text/css application/xml application/X-JSON;  
    charset UTF-8;  
    # 配置后端代理服务  
    upstream rc{  
        server 10.10.*.15:8080 weight=5 max_fails=3;
        # 最大长连数  
        keepalive 32;  
    }  
    # 配置lua依赖库地址  
    lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";  
   
    server {  
        listen       80;  
        server_name  localhost;  
        location /favicon.ico {  
            root   html;  
                index  index.html index.htm;  
        }  
        location / {  
            proxy_connect_timeout 8;  
            proxy_send_timeout 8;  
            proxy_read_timeout 8;  
            proxy_buffer_size 4k;  
            proxy_buffers 512 8k;  
            proxy_busy_buffers_size 8k;  
            proxy_temp_file_write_size 64k;  
            proxy_next_upstream http_500 http_502  http_503 http_504  error timeout invalid_header;  
            root   html;  
            index  index.html index.htm;  
            proxy_pass http://rc;  
            proxy_http_version 1.1;  
            proxy_set_header Connection "";  
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;  
            # 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制  
            log_by_lua '  
                -- 引入lua所有api  
                local cjson = require "cjson"  
                local producer = require "resty.kafka.producer"  
                -- 定义kafka broker地址,ip需要和kafka的host.name配置一致  
                local broker_list = {  
                    { host = "10.10.78.52", port = 9092 },  
                }  
                -- 定义json便于日志数据整理收集  
                local log_json = {}  
                log_json["uri"]=ngx.var.uri  
                log_json["args"]=ngx.var.args  
                log_json["host"]=ngx.var.host  
                log_json["request_body"]=ngx.var.request_body  
                log_json["remote_addr"] = ngx.var.remote_addr  
                log_json["remote_user"] = ngx.var.remote_user  
                log_json["time_local"] = ngx.var.time_local  
                log_json["status"] = ngx.var.status  
                log_json["body_bytes_sent"] = ngx.var.body_bytes_sent  
                log_json["http_referer"] = ngx.var.http_referer  
                log_json["http_user_agent"] = ngx.var.http_user_agent  
                log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for  
                log_json["upstream_response_time"] = ngx.var.upstream_response_time  
                log_json["request_time"] = ngx.var.request_time  
                -- 转换json为字符串  
                local message = cjson.encode(log_json);  
                -- 定义kafka异步生产者  
                local bp = producer:new(broker_list, { producer_type = "async" })  
                -- 发送日志消息,send第二个参数key,用于kafka路由控制:  
                -- key为nill(空)时,一段时间向同一partition写入数据  
                -- 指定key,按照key的hash写入到对应的partition  
                local ok, err = bp:send("test1", nil, message)  
   
                if not ok then  
                    ngx.log(ngx.ERR, "kafka send err:", err)  
                    return  
                end  
            ';  
        }  
        error_page   500 502 503 504  /50x.html;  
        location = /50x.html {  
            root   html;  
        }  
    }  
}  

六:检测&运行

# 检测配置,只检测nginx配置是否正确,lua错误日志在nginx的error.log文件中  
systemctl restart nginx

七:测试
1:使用任意http请求发送给当前nginx,如:

http://172.16.1.240:1010/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

2:查看upstream代理是否工作正常

3:查看kafka 日志对应的topic是否产生消息日志,如下:

# 从头消费topic数据命令
sh kafka-console-consumer.sh --zookeeper 10.10.78.52:2181 --topic test1 --from-beginning

关于Zeno Chen

本人涉及的领域较多,杂而不精 程序设计语言: Perl, Java, PHP, Python; 数据库系统: MySQL,Oracle; 偶尔做做电路板的开发,主攻STM32单片机
此条目发表在Linux分类目录。将固定链接加入收藏夹。