Web日志分析工具

1. Web日志分析工具

一年前,我写了一个Web日志进行分析并按照分类导出到Excel的工具
一年后,我在AI的加持帮助下对这个工具代码进行了重构
通过Web页面的方式进行呈现,支持了更多的日志解析,更灵活的使用

2. 为什么要重构这个工具

想法

  1. 日志分析的过程中总会遇到性能瓶颈和查找指定相关数据的瓶颈,最终只需要日志中的部分数据,只需要一个小工具获取想要的结果就可以了,暂时不考虑重量级产品全流量匹配
  2. 会遇到跨平台代码处理的问题,既然都跨平台了直接搞个Web版本更直观
  3. 无论是流量匹配还是日志匹配,归根结底还是通过正则表达式的方式去做,随着AI时代的猛烈迭代,我尝试使用AI去编码看看能否给我一个其它的处理数据的方式,尝试了不同的AI产品,例如LLAMA3,ChatGPT,Gemini等相关AI产品,最终的结论依旧是通过正则表达式的方式匹配
  4. 既然使用正则表达式的方式,我依旧在想能不能搞个自动匹配的方式实现,只需要上传日志脚本自动分析匹配最后输出结果,但事实是其实电脑也同样看不懂猜不出来日志中的每个字段究竟是什么,希望未来的AI可以猜出来人类想要什么,哪怕是给个选项!
  5. 在可以输出结果的同时顺便统计出次数数量方便进行统计工作,开始打算使用图表的方式去做最终没做,无法适配不同的图表不如给出输出的数据自己用其它的工具去做,这样更灵活
  6. 在第一版的Web日志分析工具中,输出的是Excel格式,这次直接被我取消了
    1. 原因1:Excel文件需要依赖Microsoft Excel或其他兼容工具
    2. 原因2:Excel在处理非常大的数据集时可能会变得非常缓慢
    3. 原因3:而如果需要人类易读的格式,并且计划由用户手动操作数据,Excel可能是更方便的选择,但它的处理能力不如CSV和JSON强大
  7. 这次使用的CSV和JSON格式进行输出
    1. CSV格式兼容性更好,几乎所有的数据分析工具、数据库和编程语言都能轻松处理CSV格式。它是一种通用的文本格式,可以在不同平台和工具之间无缝传输
    2. CSV文件结构简单,数据存储在纯文本中,适合快速读取和处理,如果后续有其它工具进行调用的话更方便
    3. 为什么增加了JSON呢,因为简单易懂:JSON采用键值对的形式,结构清晰,容易理解。即使没有专业的编程背景的人也能很快理解JSON的数据结构,JSON跨平台兼容性也不错

实现过程

参考了几个框架最终选择的streamlit框架,别问为啥,对我来说无论用什么好用就行

按照思路:
第一步上传文件能不能上传成功,日志大小问题
第二步使用正则匹配日志,能不能匹配上选择的日志,反反复复的找了很多日志去测试,确实非常耗时的
第三步输出日志,能输出结果就好了,别输出报错或者无法下载就行

代码量虽然不大但中途改了很多次代码,主要改的都是正则表达式用来匹配日志的,中途反反复复的匹配日志,最终这事儿成了

中途遇到的困难

  1. 困难还是有的,一年前第一版的Web日志分析工具改正则表达式匹配日志确实挺难的,没想打一年后有AI可以帮我干了,在LLAMA3,ChatGPT,Gemini等相关AI工具的帮助下写正则表达式方便多了,每家AI输出的正则表达式大同小异,不行就多试几次比手写快
  2. 中途大胆的做了些尝试,把我的代码全部交给AI例如LLAMA3,ChatGPT,Gemini等相关AI产品,让AI帮我修改全部的代码重构去实现最初的想法只需要上传日志脚本自动分析匹配最后输出结果,但结果是不行,等以后有其它的方式可以做到我在升级V3版本的Web日志分析工具吧,目前确实不太好做
  3. 既然没办法实现全自动的方式,最终把Apache、Nginx、IIS和Tomcat常用的日志格式进行正则表达式进行匹配,不像V1版本一样只处理Nginx日志
  4. 希望未来的AI工具更好用,以后有条件了一定搞个全自动的,因为目前增加新的日志格式的话还是要增加正则的规则,我不想改正则了。

未来的计划(还没实现)

  1. 上传日志,脚本全自动分析匹配,输出结果
  2. 上传日志,解析全部日志字段,手动打类型标签,输出结果

3. 功能介绍

  1. 支持多种日志格式:支持Apache、Nginx、IIS和Tomcat日志格式。
  2. 自动识别日志格式:自动识别日志格式,无需手动配置。
  3. 高效分析:快速分析大型日志文件,获取有关网站访问量、用户行为和系统性能的详细信息。
  4. 灵活筛选:支持多种筛选条件,例如日期范围、请求方法、请求URL等。
  5. 详细统计:支持多种统计条件,例如请求方法、请求URL、状态码等。
  6. 导出文件:支持导出CSV和JSON文件,方便进一步分析和处理。

4. 项目地址

以下是完成的Python代码

# Author: liqixin
# Mail: [email protected]
# Web: https://www.qixinlee.com

import streamlit as st
import pandas as pd
import json
import logging
import os
import re
from datetime import datetime
from typing import List, Dict

# 设置日志级别和格式
logging.basicConfig(filename='log.log', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

# 定义日志解析器的基类
class LogParser:
    def __init__(self, log_file: bytes):
        self.log_file = log_file

    def _parse_log(self, log_pattern: re.Pattern) -> List[Dict]:
        parsed_log_entries = []
        for line in self.log_file:
            line = line.decode('utf-8')
            match = log_pattern.findall(line)
            if match:
                for match_item in match:
                    request = match_item[3]
                    request_parts = request.split(' ', 2)
                    if len(request_parts) == 3:
                        request_method, request_url, http_protocol = request_parts
                    else:
                        request_method = request_parts[0]
                        request_url = ''
                        http_protocol = ''
                    parsed_log_entries.append({
                        'remote_addr': match_item[0],
                        'remote_user': match_item[1],
                        'time_local': match_item[2],
                        'request_method': request_method,
                        'request_url': request_url,
                        'http_protocol': http_protocol,
                        'status': match_item[4],
                        'body_bytes_sent': match_item[5],
                        'http_referer': match_item[6],
                        'http_user_agent': match_item[7]
                    })
        return parsed_log_entries

    def parse(self) -> List[Dict]:
        raise NotImplementedError("必须实现解析方法")

# Apache 日志解析器
class ApacheLogParser(LogParser):
    def parse(self) -> List[Dict]:
        log_pattern = re.compile(r'(\S+) - (\S+) \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"', re.DOTALL)
        return self._parse_log(log_pattern)

# Nginx 日志解析器
class NginxLogParser(LogParser):
    def parse(self) -> List[Dict]:
        log_pattern = re.compile(r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"', re.DOTALL)
        return self._parse_log(log_pattern)

# IIS 日志解析器
class IISLogParser(LogParser):
    def parse(self) -> List[Dict]:
        log_pattern = re.compile(r'(\S+) (\S+) (\S+) \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"', re.DOTALL)
        return self._parse_log(log_pattern)

# Tomcat 日志解析器
class TomcatLogParser(LogParser):
    def parse(self) -> List[Dict]:
        log_pattern = re.compile(r'(\S+) (\S+) \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)"', re.DOTALL)
        return self._parse_log(log_pattern)

# 通用日志解析方法
class LogParserFactory:
    @staticmethod
    def create_parser(log_type: str, log_file: bytes) -> LogParser:
        if log_type == 'Apache':
            return ApacheLogParser(log_file)
        elif log_type == 'Nginx':
            return NginxLogParser(log_file)
        elif log_type == 'IIS':
            return IISLogParser(log_file)
        elif log_type == 'Tomcat':
            return TomcatLogParser(log_file)
        else:
            raise ValueError(f"不支持的日志类型: {log_type}")

# 定义导出文件函数
def export_file(df: pd.DataFrame, export_format: str) -> str:
    try:
        current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
        if export_format == 'CSV(逗号分隔值)':
            csv_file_path = f'log_entries_{current_time}.csv'
            df.to_csv(csv_file_path, index=False)
            return csv_file_path
        elif export_format == 'JSON(JavaScript对象表示法)':
            json_file_path = f'log_entries_{current_time}.json'
            df['time_local'] = df['time_local'].apply(lambda x: x.isoformat())
            df['date'] = df['date'].apply(lambda x: x.isoformat())
            with open(json_file_path, 'w', encoding='utf-8') as json_file:
                json.dump(df.to_dict(orient='records'), json_file, indent=4)
            return json_file_path
    except Exception as e:
        logging.error(f"导出文件失败:{e}")
        raise

# 定义Web应用程序
def main():
    # 设置主题
    st.set_page_config(layout="wide", page_title="安全运营 - Web日志分析工具")
    st.markdown("<style>body {background-color: #2F4F4F; color: #66D9EF;}</style>", unsafe_allow_html=True)
    st.markdown("<h1 style='text-align: center;'>安全运营 - Web日志分析工具</h1>", unsafe_allow_html=True)

    # 上传日志文件
    st.subheader('请选择日志文件')
    log_file = st.file_uploader('请选择日志文件', type=['log'])
    log_type = st.selectbox("请选择日志类型", ['Apache', 'Nginx', 'IIS', 'Tomcat'])

    if log_file is not None:
        try:
            # 使用工厂模式创建适当的日志解析器
            log_parser = LogParserFactory.create_parser(log_type, log_file)
            parsed_log_entries = log_parser.parse()

            if parsed_log_entries:
                # 日志解析结果
                st.subheader('日志解析结果')
                df = pd.DataFrame(parsed_log_entries)
                df['time_local'] = df['time_local'].apply(lambda x: datetime.strptime(x, '%d/%b/%Y:%H:%M:%S %z'))
                df['date'] = df['time_local'].dt.date
                st.write(df)

                # 选择日期范围
                st.subheader('请选择日期范围')
                col1, col2 = st.columns(2)
                start_date = col1.date_input('请选择开始日期')
                end_date = col2.date_input('请选择结束日期')
                filtered_df = df[(df['time_local'].dt.date >= start_date) & 
                                 (df['time_local'].dt.date <= end_date)]

                # 筛选功能
                st.subheader('筛选功能')
                st.subheader('请选择筛选条件')
                columns = filtered_df.columns.tolist()
                select_columns = st.multiselect('请选择筛选字段', columns)
                select_values = {}
                for column in select_columns:
                    if column == 'date':
                        select_values[column] = st.date_input(f'请选择{column}值')
                    else:
                        select_values[column] = st.selectbox(f'请选择{column}值', filtered_df[column].unique())
                filtered_df = filtered_df
                for column, value in select_values.items():
                    if column == 'date':
                        filtered_df = filtered_df[filtered_df[column] == value]
                    else:
                        filtered_df = filtered_df[filtered_df[column] == value]

                st.write(filtered_df)

                # 统计功能
                st.subheader('统计功能')
                rank_columns = filtered_df.columns.tolist()
                rank_columns_selected = st.multiselect('请选择统计字段', rank_columns)

                if rank_columns_selected:
                    for i, column in enumerate(rank_columns_selected):
                        rank_df = filtered_df[column].value_counts().reset_index()
                        rank_df.columns = [column, '次数']
                        
                        if i % 2 == 0:
                            col1, col2 = st.columns(2)
                            col1.write(rank_df)
                        else:
                            col2.write(rank_df)
                else:
                    st.write("请选择至少一个字段")
                
                # 导出文件功能
                st.subheader('请选择导出格式')
                export_format = st.selectbox('请选择导出格式', ['CSV(逗号分隔值)', 'JSON(JavaScript对象表示法)'])
                if export_format:
                    file_path = export_file(filtered_df, export_format)
                    with open(file_path, 'rb') as file:
                        if export_format == 'CSV(逗号分隔值)':
                            st.download_button('下载CSV文件', file, file_path.split('/')[-1])
                        elif export_format == 'JSON(JavaScript对象表示法)':
                            st.download_button('下载JSON文件', file, file_path.split('/')[-1])

            else:
                st.error('没有匹配的日志条目')
        except Exception as e:
            st.error(f"错误:{e}")

if __name__ == '__main__':
    main()

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注