python脚本使用阿里云slb对恶意攻击进行封堵的实现

环境准备:

1.安装python3.7和相关的依赖

并安装redis缓存数据库

pip install aliyun-python-sdk-corepip install aliyun-python-sdk-slbpip intall IPypip intall redispip intall paramiko

2.添加ram访问控制的编程接口用户

3.添加slb的访问控制策略并和需要频控的slb进行绑定

redis封堵ip的格式

脚本程序目录

Aliyun_SLB_Manager├── helpers│   ├── common.py│   ├── email.py│   ├── remote.py│   └── slb.py├── logs│   └── run_20210204.log└── run.py

# 程序核心就是使用shell命令对nginx的日志中出现的ip地址 和 访问的接口进行过滤,找出访问频繁的那些程序加入slb黑名单,同时加入redis缓存,因为slb有封堵ip个数限制,redis中存储的ip需要设置过期时间,对比后删除slb中封堵的Ip

# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200  2454 114.248.45.15  1576 47.115.122.23  1569 47.107.239.148  269 112.32.217.52grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'[root@alisz-edraw-api-server-web01:~]# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -3  2454 114.248.45.15  1576 47.115.122.23  1569 47.107.239.148

python脚本
主入口程序

run.py

import timefrom helpers.email import send_mailfrom helpers.remote import get_black_ipsfrom helpers.common import is_white_ip,get_ban_ip_time,set_ban_ip_time,groupsfrom helpers.slb import slb_add_host,slb_del_host,slb_get_hostif __name__ == "__main__":  # aliyun 访问控制针对 slb 的管理用户  # 用户登录名称 slb-frequency-user@xxx.onaliyun.com  accessKeyId = 'id'  accessSecret = 'pass'  # slb 访问控制策略id  acl_id = 'acl-slb'  # reginid 查询地址:https://help.aliyun.com/document_detail/40654.html?spm=a2c6h.13066369.0.0.54a17471VmN3kA  region_id = 'cn-shenzhen'  # 黑名单限制个数 300  slb_limit = 200  # 每10分钟访问限制阈值  threshold = 50  # 接收邮箱  mails = ['reblue520@chinasoft.cn']  # 远程ssh执行grep过滤出可疑ip  res = get_black_ips(threshold)  deny_host_list = res[0]  hosts_with_count = res[1]  hosts_with_count = sorted(hosts_with_count.items(), key=lambda x: x[1] , reverse=True)  print(hosts_with_count)  # exit()  # 等待被ban的ip , 过滤掉ip白名单  deny_hosts = []  for host in deny_host_list:    if (is_white_ip(host) == False):      deny_hosts.append(host + '/32')  # 获取所有已经被ban的ip  response = slb_get_host(accessKeyId , accessSecret , acl_id , region_id)  denied_hosts = []  if('AclEntrys' in response.keys()):    for item in response['AclEntrys']['AclEntry']:      denied_hosts.append(item['AclEntryIP'])  # 被ban超过2天,首先移除  must_del_hosts = []  denied_hosts_clone = denied_hosts.copy()  for host in denied_hosts:    if (get_ban_ip_time(host) == 0 or (get_ban_ip_time(host) < int(round(time.time())) - 2* 24 * 3600)):      must_del_hosts.append(host)      denied_hosts_clone.remove(host)  # 排除相同的  deny_hosts_new = []  for item in deny_hosts:    if(item not in denied_hosts_clone):      deny_hosts_new.append(item)  # 两者和超过300的限制  if((len(denied_hosts_clone)+len(deny_hosts_new))>slb_limit):    denied_hosts_detail = {}    for host in denied_hosts_clone:      denied_hosts_detail[host] = get_ban_ip_time(host)    # 需要排除的数量    num = len(denied_hosts_clone) + len(deny_hosts_new) - slb_limit    denied_hosts_detail = sorted(denied_hosts_detail.items(), key=lambda x: x[1])    denied_hosts_detail = denied_hosts_detail[:num]    for item in denied_hosts_detail:      must_del_hosts.append(item[0])  print("denied:",denied_hosts)  print("delete:",must_del_hosts)  print("add:",deny_hosts_new)  # exit()  # 先删除一部分 must_del_hosts  if(len(must_del_hosts)>0):    if (len(must_del_hosts)>50):      must_del_hosts_clone = groups(must_del_hosts,50)      for item in must_del_hosts_clone:        slb_del_host(item, accessKeyId, accessSecret, acl_id, region_id)        time.sleep(1)    else :      slb_del_host(must_del_hosts, accessKeyId, accessSecret, acl_id, region_id)  # 再新增 deny_hosts_new  if(len(deny_hosts_new)>0):    if(len(deny_hosts_new)>50):      deny_hosts_new_clone = groups(deny_hosts_new,50)      for item in deny_hosts_new_clone:        slb_add_host(item, accessKeyId, accessSecret, acl_id, region_id)        time.sleep(1)    else:      slb_add_host(deny_hosts_new, accessKeyId, accessSecret, acl_id, region_id)  # 记录ip被禁时间  for host in deny_hosts_new:    set_ban_ip_time(host)  if (len(deny_hosts_new) >= 1):    mail_content = ''    if(len(must_del_hosts) > 0):      mail_content += "以下黑名单已被解禁("+str(len(must_del_hosts))+"):/n"+"/n".join(must_del_hosts) + "/n"    mail_content += "/n新增以下ip黑名单("+str(len(deny_hosts_new))+"):/n"+"/n".join(deny_hosts_new)    mail_content += "/n/n10分钟访问超过15次("+str(len(hosts_with_count))+"):/n"    for item in hosts_with_count:      mail_content += str(item[1]) + " " + str(item[0]) + "/n"    mail_content += "/n/n黑名单("+str(len(denied_hosts))+"个):/n"    for item in denied_hosts:      mail_content += str(item) + "/n"    send_mail(mail_content , mails)

slb操作相关的脚本
slb.py

import logging , jsonfrom aliyunsdkcore.client import AcsClientfrom aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequestfrom aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequestfrom aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest# 阿里云slb访问控制里添加ipdef slb_add_host(hosts, accessKeyId, accessSecret, acl_id, region_id):  client = AcsClient(accessKeyId, accessSecret, region_id)  request = AddAccessControlListEntryRequest()  request.set_accept_format('json')  logging.info("正在封印IP:%s" % ",".join(hosts))  try:    add_hosts = []    for host in hosts:      add_hosts.append({"entry": host, "comment": "deny"})    request.set_AclEntrys(add_hosts)    request.set_AclId(acl_id)    response = client.do_action_with_exception(request)    print(response)  except BaseException as e:    logging.error("添加黑名单失败,原因:%s" % e)# slb删除ipdef slb_del_host(hosts, accessKeyId, accessSecret, acl_id , region_id = 'us-west-1'):  logging.info("正在解封IP:%s" % ",".join(hosts))  try:    del_hosts = []    for host in hosts:      del_hosts.append({"entry": host, "comment": "deny"})    client = AcsClient(accessKeyId, accessSecret, region_id)    request = RemoveAccessControlListEntryRequest()    request.set_accept_format('json')    request.set_AclEntrys(del_hosts)    request.set_AclId(acl_id)    client.do_action_with_exception(request)    logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果    logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果  except BaseException as e:    logging.error("移出黑名单失败,原因:%s" % e)# 阿里云slb获取IP黑名单列表def slb_get_host(accessKeyId, accessSecret, acl_id, region_id):  client = AcsClient(accessKeyId, accessSecret, region_id)  request = DescribeAccessControlListAttributeRequest()  request.set_accept_format('json')  try:    request.set_AclId(acl_id)    response = client.do_action_with_exception(request)    data_sub = json.loads((response.decode("utf-8")))    return data_sub  except BaseException as e:    logging.error("获取黑名单失败,原因:%s" % e)

远程操作日志的脚本
remote.py

#!/usr/bin/python# -*- coding: UTF-8 -*-import datetimeimport reimport paramikodef get_black_ips(threshold = 100):  # file = '/data/www/logs/nginx_log/access/*api*_access.log'  file = '/data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log'  # 可以 ssh 访问服务器 nginx 日志的用户信息  username = 'apache'  passwd = 'pass'  ten_min_time = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d/%b/%Y:%H:%M")  ten_min_time = ten_min_time[:-1]  # 线上 需要对日志进行过滤的目标服务器,一般是内网ip,本地调试时可以直接使用外网ip方便调试  ssh_hosts = ['1.1.1.1']  deny_host_list = []  for host in ssh_hosts:    '''    # 过滤日志文件,需要显示如下效果,次数 ip地址,需要定位具体的api接口,否则误伤率极高    # grep 04/Feb/2021:15:2 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -5 | awk '{if ($1 >15)print $1,$2}'    2998 116.248.89.2    2381 114.248.45.15    1639 47.107.239.148    1580 47.115.122.23    245 59.109.149.45    '''    shell = (          # "grep %s %s | grep '/index.php?submod=checkout&method=index&pid' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'") % (          # grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api/user' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'          "grep %s %s | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >2000)print $1,$2}'") % (          ten_min_time, file)    print(shell)    ssh = paramiko.SSHClient()    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())    ssh.connect(host, port=2020, username=username, password=passwd)    stdin, stdout, stderr = ssh.exec_command(shell)    result = stdout.read().decode(encoding="utf-8")    deny_host_re = re.compile(r'/d{1,99} /d{1,3}/./d{1,3}/./d{1,3}/./d{1,3}')    deny_host_re = deny_host_re.findall(result)    deny_host_list = deny_host_list + deny_host_re  uniq_host = {}  for host_str in deny_host_list:    tmp = host_str.split(' ')    if tmp[1] in uniq_host:      uniq_host[tmp[1]] += int(tmp[0])    else:      uniq_host[tmp[1]] = int(tmp[0])  deny_host_list = []  for v in uniq_host:    if (uniq_host[v] > threshold):      deny_host_list.append(v)  return [deny_host_list , uniq_host]

发送邮件的脚本
email.py

#!/usr/bin/python# -*- coding: UTF-8 -*-import smtplibfrom email.mime.text import MIMETextfrom email.header import Headerimport loggingdef send_mail(host , receivers):  # 发送邮件的服务器,用户信息  mail_host = "smtpdm-ap-southeast-1.aliyun.com"  mail_user = "admin@mail.chinasoft.com"  mail_pass = "pass"  sender = 'admin@mail.chinasoft.com'  message = MIMEText('chinasoft国内接口被刷,单个IP最近10分钟内访问超过阈值100次会收到此邮件告警!!!!/n%s' % (host), 'plain', 'utf-8')  message['From'] = Header("chinasoft国内接口被刷", 'utf-8')  subject ='[DDOS]购买链接接口异常链接!!'  message['Subject'] = Header(subject, 'utf-8')  try:    smtpObj = smtplib.SMTP(mail_host, 80)    smtpObj.login(mail_user, mail_pass)    smtpObj.sendmail(sender, receivers, message.as_string())    logging.info("邮件发送成功")  except smtplib.SMTPException as e:    logging.error("发送邮件失败,原因:%s" % e)

配置文件
common.py

#!/usr/bin/python# -*- coding: UTF-8 -*-import IPyfrom functools import reduceimport redis,timedef groups(L1,len1):  groups=zip(*(iter(L1),)*len1)  L2=[list(i) for i in groups]  n=len(L1) % len1  L2.append(L1[-n:]) if n !=0 else L2  return L2def ip_into_int(ip):  return reduce(lambda x, y: (x << 8) + y, map(int, ip.split('.')))# 过滤掉内网ipdef is_internal_ip(ip):  ip = ip_into_int(ip)  net_a = ip_into_int('10.255.255.255') >> 24  net_b = ip_into_int('172.31.255.255') >> 20  net_c = ip_into_int('192.168.255.255') >> 16  return ip >> 24 == net_a or ip >> 20 == net_b or ip >> 16 == net_c# 是否为白名单ip (公司内网+集群内网ip+slb和需要互访的服务器ip避免误杀)def is_white_ip(ip):  if (is_internal_ip(ip)):    return True  white_hosts = [    # web-servers    '1.1.1.1',    '1.1.1.2',  ];  for white in white_hosts:    if (ip in IPy.IP(white)):      return True  return Falsedef get_ban_ip_time(ip):  pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)  client = redis.Redis(connection_pool=pool)  key = 'slb_ban_'+ip  val = client.get(key)  if val == None:    return 0  else :    return int(val)def set_ban_ip_time(ip):  pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)  client = redis.Redis(connection_pool=pool)  key = 'slb_ban_'+ip  timestamp = time.time()  timestamp = int(round(timestamp))  return client.set(key , timestamp , 86400)

本地可以直接运行run.py进行调试

到此这篇关于python脚本使用阿里云slb对恶意攻击进行封堵的实现的文章就介绍到这了,更多相关python脚本阿里云slb内容请搜索 以前的文章或继续浏览下面的相关文章希望大家以后多多支持 !

相关文章

发表新评论