This commit is contained in:
LiuJR 2024-06-30 17:10:11 +08:00
commit 45acf0cca5
8 changed files with 254 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.idea/
*/__pycache__/
.vscode

0
README.md Normal file
View File

8
config/config.yaml Normal file
View File

@ -0,0 +1,8 @@
name: server_login_detection_config
dingding_robot_token:
access_token: "YOUR_ACCESS_TOKEN"
server_name:
name: "YOur_Server_Name"

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
PyYAML==6.0.1
Requests==2.32.3
translate==3.6.1

36
run.py Normal file
View File

@ -0,0 +1,36 @@
# !/usr/bin/env python
# -*-coding:utf-8 -*-
"""
# File : run.py.py
# Time 2024/6/30 上午 12:23
# Modify sityliu
# Author :蓝陌
# version python 3.8
# Description服务器登录检测当检测到登录行为时自动发送钉钉消息
"""
import os
from src.message_sed import MessageSedes
sed = MessageSedes()
def monitoring_login_behavior():
login_num = 0
while True:
if login_num == 0:
print('开始监控登录行为......')
os.popen('inotifywait -e modify /var/log/wtmp > /dev/null').readlines()
sed.run()
login_num += 1
os.popen('inotifywait -e modify /var/log/wtmp > /dev/null').readlines()
login_num = 0
if __name__ == '__main__':
# sed.run()
monitoring_login_behavior()

View File

@ -0,0 +1,11 @@
# 添加以下内容到服务文件
[Unit]
Description=My Python App
[Service]
Type=simple
ExecStart=python3 /path/to/your/run.py
[Install]
WantedBy=multi-user.target

108
src/message_sed.py Normal file
View File

@ -0,0 +1,108 @@
# !/usr/bin/env python
# -*-coding:utf-8 -*-
"""
# File : message_sed.py
# Time 2024/6/30 上午 12:34
# Modify sityliu
# Author :蓝陌
# version python 3.8
# Description消息发送组建
"""
from datetime import datetime
import os
import json
import requests
import yaml
from translate import Translator
class MessageSedes():
def __init__(self):
self.config_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/config/config.yaml'
with open(self.config_path, encoding="utf-8") as yaml_file:
self.config_data = yaml.safe_load(yaml_file)
def translate_english_to_chinese(self, text):
translator = Translator(to_lang="zh")
translation = translator.translate(text)
return translation
def dingding_msg_sed(self, token, param):
current_time = datetime.now()
url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'
headers = {'Content-Type': 'application/json;charset=utf-8'}
success_msg = f'<font color="#008000">{param["login_status"]}</font>'
data = {
"msgtype": "markdown",
"markdown": {
"title": "服务器登录通知",
"text":
"#### 服务器登录检测提醒:\n"
"您的服务器存在登陆行为,如果不是您正在操作请及时查明原因并及时处理。\n"
"\n"
f"> 服务器:{self.config_data['server_name']['name']}\n\n"
f"> 系统版本:{param['server_version']}\n\n"
f"> IP地址{param['server_ip']}\n\n"
f"> 发送时间:{current_time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"> 登陆方式:{param['login_method']}\n\n"
f"> 登陆账号:{param['login_id']}\n\n"
f"> 登陆终端:{param['login_terminal']}\n\n"
f"> 登陆IP{param['login_ip']}\n\n"
f"> 登陆归属地:{param['login_location']}\n\n"
f"> 登陆时间:{param['login_time']}\n\n"
f"> 登陆状态: {success_msg}\n\n"
},
}
requests.post(url, data=json.dumps(data), headers=headers)
def server_info(self):
out_ip = os.popen('curl -s https://api.ipify.org').readlines()
int_ip = os.popen("hostname -I | awk '{print $1}' ORS=''").readlines()
id = os.popen("cat /etc/os-release | grep '^ID=' | awk -F '=' '{print $2}' ORS=''").readlines()
version = os.popen("""cat /etc/os-release | grep '^VERSION=' | awk -F '=' '{print $2}' | awk -F '"' '{print $2}' ORS=''""").readlines()
uname = os.popen("uname -m").readlines()
server_login = os.popen("echo $SSH_CONNECTION").readlines()
if not server_login:
login_method = '其他登陆方式'
else:
login_method = '账号密码/密钥登陆'
last = os.popen('last -n 1').readlines()
ip_address = last[0].split()[2]
geoip = os.popen(f"curl -s 'http://ip-api.com/json/{ip_address}'").readlines()
geoip_json = geoip[0]
geoip = json.loads(geoip_json)
if geoip['status'] == 'fail':
text = geoip['message']
text = self. translate_english_to_chinese(text)
elif geoip['status'] == 'success':
text = geoip['country']+' '+geoip["regionName"]+' '+geoip["city"]
text = self. translate_english_to_chinese(text)
output = os.popen('last -n 1').readlines()
login_info = output[0].split()
login_time = login_info[3] + ' ' + login_info[4] + ' ' + login_info[5] + ' ' + login_info[6]
login_status = login_info[7]+ ' ' + login_info[8] + ' ' + login_info[9]
server_info = {
"server_name": self.config_data['server_name']['name'],
"server_ip": out_ip[0]+'(外) '+int_ip[0]+'(内)',
"server_version": id[0]+ ' ' +version[0] + ' 架构: ' + uname[0],
"login_method": login_method,
"login_id": last[0].split()[0],
"login_terminal": last[0].split()[1],
"login_ip": last[0].split()[2],
"login_location": text,
"login_time": login_time,
"login_status": login_status,
}
return server_info
def run(self):
print("检测到登陆行为,开始发送通知......")
parameter = self.server_info()
self.dingding_msg_sed(self.config_data['dingding_robot_token']['access_token'], parameter)

84
test/test_info.py Normal file
View File

@ -0,0 +1,84 @@
import json
import os
import time
# info_1 = os.popen('inotifywait -e modify /var/log/wtmp > /dev/null').readlines()
# print(info_1)
info_1 = os.popen('curl -s https://api.ipify.org').readlines()
print(info_1[0])
info_2 = os.popen("hostname -I | awk '{print $1}' ORS=''").readlines()
print(info_2[0])
info_3 = os.popen("cat /etc/os-release | grep '^ID=' | awk -F '=' '{print $2}' ORS=''").readlines()
info_4 = os.popen("""cat /etc/os-release | grep '^VERSION=' | awk -F '=' '{print $2}' | awk -F '"' '{print $2}' ORS=''""").readlines()
info_5 = os.popen("uname -m").readlines()
print(info_3[0]+ ' ' +info_4[0] + ' ' +info_5[0])
info_6 = os.popen("'echo $SSH_CONNECTION' ORS=''").readlines()
print(info_6)
print(type(info_6))
if not info_6:
login_method = '其他登陆方式'
else:
login_method = 'SSH登陆'
last = os.popen('last -n 1').readlines()
print(last[0])
print(last[0].split()[0])
print(last[0].split()[1])
print(last[0].split()[2])
output = os.popen('last -n 1').readlines()
login_info = output[0].split()
login_time = login_info[3] + ' ' + login_info[4] + ' ' + login_info[5] + ' ' + login_info[6]
print("Login Time:", login_time)
print(login_info[7]+ ' ' + login_info[8] + ' ' + login_info[9])
geoip = os.popen(f"curl -s 'http://ip-api.com/json/10.168.1.159'").readlines()
print(geoip)
geoip_json = geoip[0]
geoip = json.loads(geoip_json)
print(type(geoip))
if geoip['status'] == 'fail':
print(geoip['message'])
elif geoip['status'] == 'success':
print(geoip['country'],geoip["regionName"], geoip["city"])
text = geoip['country']+' '+geoip["regionName"]+' '+geoip["city"]
server_info = {
"server_name": 'server_name',
"server_ip": info_1[0]+'(外) '+info_2[0]+'(内)',
"server_info": info_3[0]+ ' ' +info_4[0] + ' ' +info_5[0],
"login_method": login_method,
}
print(server_info)
from translate import Translator
def translate_english_to_chinese(text):
translator = Translator(to_lang="zh")
translation = translator.translate(text)
return translation
english_text = "Hello, how are you?"
chinese_text = translate_english_to_chinese(english_text)
# print(chinese_text)
# print(translate_english_to_chinese(geoip['message']))
# print(translate_english_to_chinese(text))
lastlog = os.popen('lastlog').readlines()
print(lastlog)
print(type(lastlog))
print(lastlog[0])
lastlog = os.popen('lastlog').readlines()
# for i in range(len(lastlog)):
# print(lastlog[i])
# lastlog_list.append(lastlog[i])
# print(lastlog_list)
print(lastlog)