当前位置: 首页 > news >正文

网站风格下载seo优缺点

网站风格下载,seo优缺点,怎么自己弄网站,人力资源网站建设项目环境:centOS7.9 mariadb5.6 celery5.0 kafka3.6.1 项目时间:2025年1月 项目描述:这个项目搭建了一个基于 Nginx 和 Flask 的 Web 集群,使用 Filebeat 将 Nginx 的访问日志发送到 Kafka 集群。通过 Python 消费者程序解析日志…

项目环境:centOS7.9 mariadb5.6 celery5.0 kafka3.6.1

项目时间:2025年1月

项目描述:这个项目搭建了一个基于 Nginx 和 Flask 的 Web 集群,使用 Filebeat 将 Nginx 的访问日志发送到 Kafka 集群。通过 Python 消费者程序解析日志并存储到 MySQL 数据库中,最后使用 Celery 监控数据库中的流量数据,当流量超过阈值时发送邮件告警。

项目目标

搭建一个日志收集平台,用于监控 Nginx 的 Web 访问日志流量。通过反向代理、日志收集、数据处理和实时监控,实现流量异常告警功能。

系统架构

1.Web 集群:

使用两台 Nginx 服务器搭建反向代理集群

用户请求通过 Nginx 被代理到后端的 Flask 程序

Nginx 生成访问日志(access.log)

2.日志收集:

使用 Filebeat 监听 Nginx 的 access 日志

Filebeat 将日志实时发送到 Kafka 集群

3.消息队列:

Kafka 集群包含三个 Broker 节点

创建 Kafka Topic,每个 Topic 包含三个分区和三个副本,确保高可用性和负载均衡

4.日志处理与存储:

使用 Python-kafka 编写的消费者程序并发消费 Kafka 中的日志数据

提取日志中的关键信息(如 IP、流量、时间、省份、运营商等)

将清洗后的数据存储到 MySQL 数据库中

5.流量监控与告警:

使用 Celery 定时任务监控 MySQL 数据库中的流量数据

如果某一分钟内的流量超过设定阈值(过高或过低),触发邮件告警

架构图

关键技术栈

Nginx:提供反向代理服务,生成访问日志

Filebeat:轻量级日志收集器,将日志发送到 Kafka

Kafka:分布式消息队列,存储和转发日志数据

Python-kafka:用于编写 Kafka 消费者程序

MySQL:存储清洗后的日志数据

Celery:分布式任务队列,用于定时监控和告警

SMTP:用于发送邮件告警

实现步骤

1.环境准备

依赖软件安装

yum源配置:cd /etc/yum.repos.dmkdir repomv *.repo repo/下载阿里云源:curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo下载依赖软件:
yum install epel-release -y
yum install wget vim java-11-openjdk.x86_64  -y

配置静态ip地址,修改/etc/sysconfig/network-scripts/ifcfg-ens33

PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=none
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
NAME=ens33
UUID=0f3239b9-6ba7-406e-94e8-fa7b680a4d82
DEVICE=ens33
ONBOOT=yes
IPADDR=192.168.20.163                   
NETMASK=255.255.255.0
GATEWAY=192.168.20.2
DNS1=114.114.114.114

配置主机名

hostnamectl set-hostname kafka1

修改/etc/hosts文件,添加主机名和ip地址映射

192.168.20.161  kafka1
192.168.20.162  kafka2
192.168.20.163  kafka3

关闭防火墙与selinux

关闭防火墙:iptables -F                #清空防火墙规则systemctl stop firewalld   #关闭防火墙服务systemctl disable firewalld  #设置开机不自启
关闭selinux,编辑/etc/selinux/config 文件SELINUX=disabled
重启系统:reboot

2.部署kafka集群

下载kafka

cd /opt
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

解压缩

tar xf kafka_2.13-3.6.1.tgz 
cd kafka_2.13-3.6.1

修改配置文件,位于kafka目录下config/kraft/server.properties

#修改节点id,每个节点唯一
node.id=1#修改控制器投票列表
controller.quorum.voters=1@192.168.223.161:9093,2@192.168.223.162:9093,3@192.168.223.163:9093#修改监听器和控制器,绑定ip。其中kafka1为主机名,可用本机ip地址代替
listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093# 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
# 如果未设置,则使用"listeners"的值.
advertised.listeners=PLAINTEXT://kafka3:9092

配置文件详解

############################# Server Basics ############################## 此服务器的角色。设置此项将进入KRaft模式(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
process.roles=broker,controller# 节点 ID
node.id=2# 全 Controller 列表
controller.quorum.voters=2@192.168.58.130:9093,3@192.168.58.131:9093,4@192.168.58.132:9093############################# Socket Server Settings ############################## 套接字服务器侦听的地址.
# 组合节点(即具有`process.roles=broker,controller`的节点)必须至少在此处列出控制器侦听器
# 如果没有定义代理侦听器,那么默认侦听器将使用一个等于java.net.InetAddress.getCanonicalHostName()值的主机名,
# 带有PLAINTEXT侦听器名称和端口9092
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#不同服务器绑定的端口
listeners=PLAINTEXT://192.168.58.130:9092,CONTROLLER://192.168.58.130:9093# 用于代理之间通信的侦听器的名称(broker 服务协议别名)
inter.broker.listener.name=PLAINTEXT# 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
# 如果未设置,则使用"listeners"的值.
advertised.listeners=PLAINTEXT://192.168.58.130:9092# controller 服务协议别名
# 控制器使用的侦听器名称的逗号分隔列表
# 如果`listener.security.protocol.map`中未设置显式映射,则默认使用PLAINTEXT协议
# 如果在KRaft模式下运行,这是必需的。
controller.listener.names=CONTROLLER# 将侦听器名称映射到安全协议,默认情况下它们是相同的。(协议别名到安全协议的映射)有关更多详细信息,请参阅配置文档.
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# 服务器用于从网络接收请求并向网络发送响应的线程数
num.network.threads=3# 服务器用于处理请求的线程数,其中可能包括磁盘I/O
num.io.threads=8# 套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=102400# 套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400# 套接字服务器将接受的请求的最大大小(防止OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## 存储日志文件的目录的逗号分隔列表(kafka 数据存储目录)
#log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas
log.dirs=/tmp/kraft-combined-logs# 每个主题的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。
num.partitions=1# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。
# 对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# 组元数据内部主题"__consumer_offsets"和"__transaction_state"的复制因子
# 对于除开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## 消息会立即写入文件系统,但默认情况下,我们只使用fsync()进行同步
# 操作系统缓存延迟。以下配置控制将数据刷新到磁盘.
# 这里有一些重要的权衡:
#    1. Durability(持久性): 如果不使用复制,未清理的数据可能会丢失
#    2. Latency(延迟): 当刷新发生时,非常大的刷新间隔可能会导致延迟峰值,因为将有大量数据要刷新.
#    3. Throughput(吞吐量): 刷新通常是最昂贵的操作,较小的刷新间隔可能导致过多的寻道.
# 下面的设置允许配置刷新策略,以便在一段时间后或每N条消息(或两者兼有)刷新数据。这可以全局完成,并在每个主题的基础上覆盖# 强制将数据刷新到磁盘之前要接受的消息数
#log.flush.interval.messages=10000# 在我们强制刷新之前,消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## 以下配置控制日志段的处理。可以将该策略设置为在一段时间后删除分段,或者在累积了给定大小之后删除分段。
# 只要满足这些条件中的任意一个,segment就会被删除。删除总是从日志的末尾开始# 日志文件因使用年限而有资格删除的最短使用年限
log.retention.hours=168# 基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的函数。
#log.retention.bytes=1073741824# 日志segment文件的最大大小。当达到此大小时,将创建一个新的日志segment
log.segment.bytes=1073741824# 检查日志segments以查看是否可以根据保留策略删除它们的间隔
log.retention.check.interval.ms=300000
  • 创建集群
cd   /opt/kafka_2.13-3.6.1
# 在其中一台执行,生成集群UUID命令,拿到集群UUID保存在当前tmp_random文件中
bin/kafka-storage.sh random-uuid >tmp_random
# 查看uuid[root@chainmaker1 kafka_2.13-3.6.1]# cat tmp_random
z3oq9M4IQguOBm2rt1ovmQ# 在所有机器上执行,它会初始化存储区域,为 Kafka 集群的元数据存储和后续操作做好准备。z3oq9M4IQguOBm2rt1ovmQ为自己生成的集群uuidbin/kafka-storage.sh format -t z3oq9M4IQguOBm2rt1ovmQ -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties
  • 启动
启动:
bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
关闭:
bin/kafka-server-stop.sh
    • 命令行启动
  • 使用systemctl管理服务 -- systemd
## 编辑文件  /usr/lib/systemd/system/kafka.service [Unit]Description=Apache Kafka server (KRaft mode)Documentation=http://kafka.apache.org/documentation.htmlAfter=network.target[Service]Type=forkingUser=rootGroup=rootEnvironment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-2.el7_9.x86_64/bin/"ExecStart=/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.propertiesExecStop=/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.shRestart=on-failure[Install]WantedBy=multi-user.target#重新加载systemd配置systemctl daemon-reload#启动kafka服务systemctl  start  kafka#关闭kafka服务systemctl  stop  kafka#设置开机自启systemctl enable kafka
  • 测试集群Kraft模式下Kafka脚本的使用-阿里云开发者社区 (aliyun.com)
# 创建topic 
bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic my_topic** --replication-factor指定副本因子,--partitions指定分区数,--topic指定主题名称。# 查看topicbin/kafka-topics.sh --list --bootstrap-server kafka3:9092#创建生产者,发送消息,测试用
bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic my_topic#创建消费者,获取数据,测试用
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic my_topic --from-beginning

3.部署filebeat

  • 一篇文章搞懂filebeat(ELK) - 一寸HUI - 博客园

安装

1、rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
2、编辑 vim /etc/yum.repos.d/fb.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
3、yum安装
yum  install  filebeat -yrpm -qa  |grep filebeat  #可以查看filebeat有没有安装  rpm -qa 是查看机器上安装的所有软件包
rpm -ql  filebeat  查看filebeat安装到哪里去了,牵扯的文件有哪些

配置,修改配置文件/etc/filebeat/filebeat.yml

filebeat.inputs:
- type: log# Change to true to enable this input configuration.enabled: true# Paths that should be crawled and fetched. Glob based paths.paths:- /var/log/nginx/access.log - /var/log/nginx/error.log
#==========------------------------------kafka-----------------------------------
output.kafka:hosts: ["192.168.20.161:9092","192.168.20.162:9092","192.168.20.163:9092"]topic: nginxlogkeep_alive: 10s

创建主题

cd /opt/kafka_2.13-3.6.1
bin/kafka-topics.sh --create --bootstrap-server  kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog

启动服务

systemctl start  filebeat
systemctl enable filebeat  #设置开机自启

4.nginx反向代理集群搭建

安装nginx

yum install epel-release -y
yum install nginx -y

编辑配置文件 /etc/nginx/conf.d/sc.conf

upstream flask {server 192.168.20.162:5000;server 192.168.20.163:5000;}server {server_name www.sc.com;location / {proxy_pass http://flask;}}

启动nginx

systemctl start nginx

4.后端flask程序

安装flask环境

yum install python3 -y
pip3 install flask -i https://pypi.tuna.tsinghua.edu.cn/simple

编辑/opt/python-flask/app.py文件

from flask import Flaskapp = Flask(__name__)@app.route("/")
def index():return "this is flask web kafka2"app.run(host = "0.0.0.0")

启动flask

python3 app.py

4.消费nginx日志

使用kafka-python并发消费nginx日志,进行清洗

查看解析IP网址http://whois.pconline.com.cn/ipJson.jsp?ip=123.123.123.123&json=true

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from multiprocessing import Process, current_process
import time
import json
import pymysql
import requests
from datetime import datetime# 配置信息
IP_URL = "https://whois.pconline.com.cn/ipJson.jsp?json=true&ip="
DB_HOST = "192.168.140.159"
DB_PORT = 3306
DB_USER = "sc"
DB_PASSWD = "Sctl@123456"
DB = "test2"# 将传入的 JSON 字符串转换成字典格式
def json_to_dict(message: str) -> dict:d1 = {}try:d1 = json.loads(message)except:print("输入信息非 JSON 格式")return d1# 解析给定的 IP 地址
def resolve_ip(ip):#if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.16."):# return "局域网", "局域网"url = IP_URL + ipresponse = requests.get(url)data = response.json()prov = data.get("pro")isp = data.get("addr").split()[1]return prov, isp
# 时间格式转换
def time_deformat(time_str):format_str = "%d/%b/%Y:%H:%M:%S"struct_time = time.strptime(time_str, format_str)result_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)return result_time# 处理日志字符串
def handler_log(log_str):log_str = log_str.split()ip = log_str[0]time_str = log_str[3][1:]flow = log_str[9]prov, isp = resolve_ip(ip)time = time_deformat(time_str)return time, ip, prov, isp, flowdef consume_kafka_partition(topic, group_id, partition):"""进程执行的函数,用于消费指定分区的 Kafka 消息"""consumer = KafkaConsumer(group_id=group_id,bootstrap_servers=['192.168.140.158:9092', '192.168.140.159:9092', '192.168.140.160:9092'],auto_offset_reset='earliest',enable_auto_commit=True,auto_commit_interval_ms=5000,value_deserializer=lambda x: x.decode('utf-8'))consumer.subscribe([topic])conn = pymysql.connect(host=DB_HOST,user=DB_USER,password=DB_PASSWD,port=DB_PORT,database=DB)cur = conn.cursor()for message in consumer:# print(f"message: {message}")# print(f"进程 {current_process().name} 消费到来自分区 {partition} 的消息:{message.value}")result_dict = json_to_dict(message.value)log_str = result_dict.get("message")print(f"message:{log_str}")result = handler_log(log_str)sql = "INSERT INTO nginx_log (date_time, ip, province, ISP, flow) VALUES (%s, %s, %s, %s, %s)"cur.execute(sql, result)conn.commit()if __name__ == "__main__":topic = "nginxlog"group_id = "message_group40"partitions = [0, 1, 2]def start_process(target, args):p = Process(target=target, args=args)p.name = f"Consumer-Partition-{args[2]}"p.start()return pprocesses = []for partition in partitions:p = start_process(consume_kafka_partition, (topic, group_id, partition))processes.append(p)for p in processes:p.join()print("所有进程已结束,指定分区消费完成")

5.celery部署

redis安装

yum install redis -y

redis 配置文件修改 /etc/redis.conf

bind 0.0.0.0   #监听本机任意ip

启动服务

systemctl start redis

redis详解

 redis:key-value 存储系统,是跨平台的非关系型数据库。redis支持的存储类型:
String: 字符串
Hash: 散列
List: 列表
Set: 集合
Sorted Set: 有序集合可以做消息中间件,可以做消息队列,可以做缓存  -- memcacheredis持久化:
RDB 持久化是通过对 Redis 中的数据进行快照(snapshot)来实现的。在指定的时间间隔内,Redis 会将内存中的数据集快照写入磁盘上的一个临时文件,成功后再将这个临时文件替换为之前的 RDB 文件。
AOF 持久化是以日志的形式记录 Redis 服务器所执行的每一个写操作(如 SET、LPUSH 等命令)。这些写操作命令会按照执行的先后顺序追加到 AOF 文件的末尾。

python库安装

pip3 install celery -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
pip3 install redis

celery配置

cd /opt
mkdir monitor/celery_app -p在celery_app中
1、编辑配置文件  config.py
from celery.schedules import crontab
BROKER_URL = 'redis://192.168.20.161:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://192.168.20.161:6379/1' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个'celery_app.task',
)CELERYBEAT_SCHEDULE = {'celery_app.task.test': {'task': 'celery_app.task.test','schedule': crontab(minute='*/1'),'args': (-3, 10)}}2、编辑__init__.py  (双下划线开头,双下划线结尾)
from celery import Celery
app = Celery('task')
app.config_from_object('celery_app.config')3、编辑task.py
from . import app@app.task
def test(a,b):print("task test start ...")result = abs(a) + abs(b)print("task test end....")return result

在celery_app中

1、编辑配置文件 config.py
from celery.schedules import crontabBROKER_URL = 'redis://192.168.140.160:6379/0'
CELERY_RESULT_BACKEND = 'redis://192.168.140.160:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ('celery_app.task',)  # 确保导入任务模块# 定时任务配置(每分钟检查一次)
CELERYBEAT_SCHEDULE = {'check-traffic-every-minute': {'task': 'celery_app.task.check_traffic','schedule': crontab(minute='*/1'),'args': (100,)  # 传递阈值参数}
}
2、编辑__init__.py (双下划线开头,双下划线结尾)
from celery import Celery
app = Celery('task')
app.config_from_object('celery_app.config')
3、编辑task.py
from . import app
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
import logging# 邮件发送配置
sender = '发送人@qq.com'
password = 'qvcaaeboyoqibfgf'  # 确保这是QQ邮箱的授权码
receiver = '接收人@qq.com'@app.task
def check_traffic(threshold=100):# 数据库连接配置conn = pymysql.connect(host='192.168.140.159',user='sc',password='Sctl@123456',port=3306,database='test2',)cur = conn.cursor()# 执行查询(优化查询条件)sql = "SELECT * FROM nginx_log WHERE flow > %s"cur.execute(sql, (threshold,))rows = cur.fetchall()# 关闭连接cur.close()conn.close()# 检查流量并发送邮件alarms = []for row in rows:flow = row[5]if flow > threshold:# 构建邮件内容mail_content = f"IP {row[2]} 的流量 {flow} 超过阈值 {threshold}!"message = MIMEText(mail_content, 'plain', 'utf-8')message['From'] = formataddr((str(Header("流量监控系统", 'utf-8')), sender))message['To'] = receivermessage['Subject'] = Header("流量报警", 'utf-8')# 发送邮件try:with smtplib.SMTP_SSL("smtp.qq.com", 465) as smtp:smtp.login(sender, password)smtp.sendmail(sender, receiver, message.as_string())logging.info(f"邮件发送成功:IP {row[2]}")except Exception as e:logging.error(f"邮件发送失败:{e}")alarms.append(f"报警:IP {row[2]} 的流量 {flow} 超过阈值 {threshold}")return "\n".join(alarms) if alarms else "流量正常"

celery 启动beat (在monitor目录下执行)

celery -A celery_app beat

celery 启动worker

 celery -A celery_app worker -l info -c 4

项目心得:

在这次项目中,我了解到了kafka,nginx,filebeat,celery以及发邮件等一系列知识,了解了整个项目的框架。在项目的搭建过程中,也曾因为配置出错、逻辑不通等等一系列问题而报错,但最终在我的不懈坚持与努力下,最终将这个项目跑通,我的QQ邮箱也收到了报警,感觉收获满满,对整个日志收集项目有了比较全面的了解

http://www.ds6.com.cn/news/55837.html

相关文章:

  • wordpress 电话插件网络搜索引擎优化
  • 怎么做健康咨询网站百度关键词推广条件
  • 武汉网址建站网络营销推广方式包括哪几种
  • 专题类的网站营销广告网站
  • 音乐网站用什么语言做外链网盘下载
  • 人民日报新闻客户端长沙seo推广
  • 新疆正能量免费下载杭州百度优化
  • 哪个网站做海报好东莞做网络推广的公司
  • 陕西 建设工程有限公司网站网站模板价格
  • 同时做几个网站的seoseo推广需要多少钱
  • 广州的做淘宝女鞋货源下载数据包的网站网址是多少?百度网址大全怎么设为主页
  • wordpress用户量上限海外网站seo优化
  • 建外贸网站公司seo平台代理
  • 珠海高端网站建设公司企业培训内容有哪些
  • 重庆微信网站开发公制作网站要找什么公司
  • 祁东网站开发网站收录查询平台
  • 广东网站建设最近国际新闻
  • 国微 网站建设2024年1月新冠高峰
  • 武汉 酒店 网站制作发帖推广百度首页
  • 哪里网站建设公司好社交网络的推广方法有哪些
  • 3d建模怎么做网站旋转知乎软文推广
  • 中山网站建设文化咨询百度网页翻译
  • 网站备案修改域名ip搜索引擎的设计与实现
  • 网站 审批号无锡网络推广外包
  • 智慧旅游网站建设方案企业网络推广技巧
  • 做网站后期怎么维护网络营销的收获与体会
  • 现在较为常用的网站开发技术seo网络推广是什么意思
  • 做家政公司网站外国搜索引擎登录入口
  • dedecms网站地图 显示三级栏目西安官网seo
  • wordpress主题站温州seo