mirror of
https://github.com/Ground-Zerro/DomainMapper.git
synced 2025-12-10 01:47:18 +07:00
349 lines
13 KiB
Python
349 lines
13 KiB
Python
import asyncio
|
||
import ipaddress
|
||
import os
|
||
import re
|
||
from collections import defaultdict
|
||
|
||
import httpx
|
||
from colorama import Fore, Style, init
|
||
|
||
init(autoreset=True)
|
||
|
||
|
||
def yellow(text):
|
||
return f"{Fore.YELLOW}{text}{Style.RESET_ALL}"
|
||
|
||
|
||
def green(text):
|
||
return f"{Fore.GREEN}{text}{Style.RESET_ALL}"
|
||
|
||
|
||
def cyan(text):
|
||
return f"{Fore.CYAN}{text}{Style.RESET_ALL}"
|
||
|
||
|
||
def red(text):
|
||
return f"{Fore.RED}{text}{Style.RESET_ALL}"
|
||
|
||
|
||
def magneta(text):
|
||
return f"{Fore.MAGENTA}{text}{Style.RESET_ALL}"
|
||
|
||
def blue(text):
|
||
return f"{Fore.BLUE}{text}{Style.RESET_ALL}"
|
||
|
||
def gateway_input(gateway):
|
||
if not gateway:
|
||
input_gateway = input(f"Укажите {green('IP шлюза')} или {green('имя интерфейса')}: ")
|
||
return input_gateway.strip() if input_gateway else None
|
||
else:
|
||
return gateway
|
||
|
||
def ken_gateway_input(ken_gateway):
|
||
if not ken_gateway:
|
||
input_ken_gateway = input(
|
||
f"Укажите {green('IP шлюза')} или {green('имя интерфейса')} или {green('IP шлюза')} и через пробел {green('имя интерфейса')}: ")
|
||
return input_ken_gateway.strip() if input_ken_gateway else None
|
||
else:
|
||
return ken_gateway
|
||
|
||
async def get_cloudflare_ips():
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.get("https://www.cloudflare.com/ips-v4/")
|
||
response.raise_for_status()
|
||
text = response.text
|
||
cloudflare_ips = set()
|
||
for line in text.splitlines():
|
||
line = line.strip()
|
||
if '/' in line:
|
||
try:
|
||
ip_network = ipaddress.ip_network(line)
|
||
for ip in ip_network:
|
||
cloudflare_ips.add(str(ip))
|
||
except ValueError:
|
||
continue
|
||
return cloudflare_ips
|
||
except Exception as e:
|
||
print("Ошибка при получении IP адресов Cloudflare:", e)
|
||
return set()
|
||
|
||
def check_include_cloudflare(cloudflare):
|
||
if cloudflare in ['yes', 'y', 'no', 'n']:
|
||
return cloudflare in ['yes', 'y']
|
||
|
||
user_input = input(
|
||
f"\n{yellow('Исключить IP адреса Cloudflare из итогового списка?')}"
|
||
f"\n1. исключить"
|
||
f"\n{green('Enter')} - оставить"
|
||
f"\nВаш выбор: "
|
||
).strip()
|
||
|
||
if user_input == '1':
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def mk_list_name_input(mk_list_name):
|
||
if not mk_list_name:
|
||
input_mk_list_name = input(f"Введите {green('LIST_NAME')} для Mikrotik firewall: ")
|
||
return input_mk_list_name.strip() if input_mk_list_name else None
|
||
else:
|
||
return mk_list_name
|
||
|
||
def comment(selected_service):
|
||
return ",".join(["".join(word.title() for word in s.split()) for s in selected_service])
|
||
|
||
def subnet_input(subnet):
|
||
if not subnet:
|
||
choice = input(
|
||
f"\n{yellow('Объединить IP-адреса в подсети?')}"
|
||
f"\n1. сократить до {green('/16')} (255.255.0.0)"
|
||
f"\n2. сократить до {green('/24')} (255.255.255.0)"
|
||
f"\n3. сократить до {green('/24')} + {green('/32')} (255.255.255.0 и 255.255.255.255)"
|
||
f"\n{green('Enter')} - пропустить"
|
||
f"\nВаш выбор: "
|
||
).strip()
|
||
|
||
if choice == '1':
|
||
subnet = '16'
|
||
elif choice == '2':
|
||
subnet = '24'
|
||
elif choice == '3':
|
||
subnet = 'mix'
|
||
else:
|
||
subnet = '32'
|
||
|
||
return subnet if subnet in {'16', '24', 'mix'} else '32'
|
||
|
||
def group_ips_in_subnets_optimized(filename: str, subnet: str):
|
||
try:
|
||
with open(filename, 'r', encoding='utf-8') as file:
|
||
ips = {line.strip() for line in file if line.strip()}
|
||
|
||
subnets = set()
|
||
|
||
if subnet == "16":
|
||
for ip in ips:
|
||
try:
|
||
network = ipaddress.IPv4Network(f"{ip}/16", strict=False)
|
||
subnets.add(str(network.network_address))
|
||
except ValueError:
|
||
continue
|
||
print(f"{Style.BRIGHT}IP-адреса агрегированы до /16 подсети{Style.RESET_ALL}")
|
||
|
||
elif subnet == "24":
|
||
for ip in ips:
|
||
try:
|
||
network = ipaddress.IPv4Network(f"{ip}/24", strict=False)
|
||
subnets.add(str(network.network_address))
|
||
except ValueError:
|
||
continue
|
||
print(f"{Style.BRIGHT}IP-адреса агрегированы до /24 подсети{Style.RESET_ALL}")
|
||
|
||
elif subnet == "mix":
|
||
octet_groups = defaultdict(list)
|
||
for ip in ips:
|
||
key = '.'.join(ip.split('.')[:3])
|
||
octet_groups[key].append(ip)
|
||
|
||
for key, group in octet_groups.items():
|
||
if len(group) > 1:
|
||
subnets.add(key + '.0')
|
||
else:
|
||
subnets.update(group)
|
||
|
||
print(f"{Style.BRIGHT}IP-адреса агрегированы до масок /24 и /32{Style.RESET_ALL}")
|
||
|
||
with open(filename, 'w', encoding='utf-8') as file:
|
||
for subnet_ip in sorted(subnets, key=lambda x: ipaddress.IPv4Address(x.split('/')[0])):
|
||
file.write(subnet_ip + '\n')
|
||
|
||
except Exception as e:
|
||
print(f"Ошибка при обработке файла: {e}")
|
||
|
||
def split_file_by_lines(filename: str, max_lines: int = 999):
|
||
try:
|
||
with open(filename, 'r', encoding='utf-8') as file:
|
||
lines = file.readlines()
|
||
|
||
total_lines = len(lines)
|
||
|
||
if total_lines <= max_lines:
|
||
return False
|
||
|
||
base_name = filename.rsplit('.', 1)[0] if '.' in filename else filename
|
||
extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else '.txt'
|
||
|
||
num_parts = (total_lines + max_lines - 1) // max_lines
|
||
|
||
print(f"\n{Style.BRIGHT}Результаты сохранены в файлы:{Style.RESET_ALL}")
|
||
for part in range(num_parts):
|
||
start_index = part * max_lines
|
||
end_index = min((part + 1) * max_lines, total_lines)
|
||
|
||
part_filename = f"{base_name}_p{part + 1}{extension}"
|
||
|
||
with open(part_filename, 'w', encoding='utf-8') as file:
|
||
file.writelines(lines[start_index:end_index])
|
||
|
||
print(f"{Style.BRIGHT}{part_filename} ({end_index - start_index} строк){Style.RESET_ALL}")
|
||
|
||
print(f"{Style.BRIGHT}Разделение завершено. Создано {num_parts} частей{Style.RESET_ALL}")
|
||
|
||
try:
|
||
os.remove(filename)
|
||
except Exception as e:
|
||
print(f"{red('Не удалось удалить исходный файл:')} {e}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"{red('Ошибка при разделении файла:')} {e}")
|
||
return False
|
||
def process_file_format(filename, filetype, gateway, selected_service, mk_list_name, mk_comment, subnet, ken_gateway):
|
||
def read_file(filename):
|
||
try:
|
||
with open(filename, 'r', encoding='utf-8') as file:
|
||
return file.readlines()
|
||
except Exception as e:
|
||
print(f"Ошибка чтения файла: {e}")
|
||
return None
|
||
|
||
def write_file(filename, ips, formatter):
|
||
formatted_ips = [formatter(ip.strip()) for ip in ips]
|
||
with open(filename, 'w', encoding='utf-8') as file:
|
||
if filetype.lower() == 'wireguard':
|
||
file.write(', '.join(formatted_ips))
|
||
else:
|
||
file.write('\n'.join(formatted_ips))
|
||
|
||
net_mask = subnet if subnet == "mix" else "255.255.0.0" if subnet == "16" else "255.255.255.0" if subnet == "24" else "255.255.255.255"
|
||
|
||
if not filetype:
|
||
user_input = input(f"""
|
||
{yellow('В каком формате сохранить файл?')}
|
||
1. {green('win')} - route add {cyan('IP')} mask {net_mask} {cyan('GATEWAY')}
|
||
2. {green('unix')} - ip route {cyan('IP')}/{subnet} {cyan('GATEWAY')}
|
||
3. {green('keenetic bat')} - route add {cyan('IP')} mask {net_mask} 0.0.0.0
|
||
4. {green('keenetic cli')} - ip route {cyan('IP')}/{subnet} {cyan('GATEWAY GATEWAY_NAME')} auto !{comment(selected_service)}
|
||
5. {green('cidr')} - {cyan('IP')}/{subnet}
|
||
6. {green('mikrotik')} - /ip/firewall/address-list add list={cyan("LIST_NAME")}{f' comment="{comment(selected_service)}"' if mk_comment != "off" else ""} address={cyan("IP")}/{subnet}
|
||
7. {green('ovpn')} - push "route {cyan('IP')} {net_mask}"
|
||
8. {green('wireguard')} - {cyan('IP')}/{subnet}, {cyan('IP')}/{subnet}, и т.д...
|
||
{green('Enter')} - {cyan('IP')}
|
||
Ваш выбор: """).strip()
|
||
|
||
mapping = {
|
||
'1': 'win',
|
||
'2': 'unix',
|
||
'3': 'keenetic bat',
|
||
'4': 'keenetic cli',
|
||
'5': 'cidr',
|
||
'6': 'mikrotik',
|
||
'7': 'ovpn',
|
||
'8': 'wireguard'
|
||
}
|
||
filetype = mapping.get(user_input, '')
|
||
|
||
ips = read_file(filename)
|
||
if not ips:
|
||
return
|
||
|
||
if filetype in ['win', 'unix']:
|
||
gateway = gateway_input(gateway)
|
||
elif filetype == 'keenetic cli':
|
||
ken_gateway = ken_gateway_input(ken_gateway)
|
||
elif filetype == 'mikrotik':
|
||
mk_list_name = mk_list_name_input(mk_list_name)
|
||
|
||
formatters = {
|
||
'win': lambda ip: f"route add {ip} mask {net_mask} {gateway}",
|
||
'unix': lambda ip: f"ip route {ip}/{subnet} {gateway}",
|
||
'keenetic bat': lambda ip: f"route add {ip} mask {net_mask} 0.0.0.0",
|
||
'keenetic cli': lambda ip: f"ip route {ip}/{subnet} {ken_gateway} auto !{comment(selected_service)}",
|
||
'cidr': lambda ip: f"{ip}/{subnet}",
|
||
'ovpn': lambda ip: f'push "route {ip} {net_mask}"',
|
||
'mikrotik': lambda ip: f'/ip/firewall/address-list add list={mk_list_name}' + (f' comment="{comment(selected_service)}"' if mk_comment != "off" else "") + f' address={ip}/{subnet}',
|
||
'wireguard': lambda ip: f"{ip}/{subnet}"
|
||
}
|
||
|
||
if subnet == "mix":
|
||
if filetype in ['win', 'keenetic bat']:
|
||
mix_formatter = lambda ip: f"{ip.strip()} mask 255.255.255.0" if ip.endswith('.0') else f"{ip.strip()} mask 255.255.255.255"
|
||
elif filetype.lower() == 'ovpn':
|
||
mix_formatter = lambda ip: f"{ip.strip()} 255.255.255.0" if ip.endswith('.0') else f"{ip.strip()} 255.255.255.255"
|
||
else:
|
||
mix_formatter = lambda ip: f"{ip.strip()}/24" if ip.endswith('.0') else f"{ip.strip()}/32"
|
||
|
||
formatters.update({
|
||
'win': lambda ip: f"route add {mix_formatter(ip)} {gateway}",
|
||
'unix': lambda ip: f"ip route {mix_formatter(ip)} {gateway}",
|
||
'keenetic bat': lambda ip: f"route add {mix_formatter(ip)} 0.0.0.0",
|
||
'keenetic cli': lambda ip: f"ip route {mix_formatter(ip)} {ken_gateway} auto !{comment(selected_service)}",
|
||
'cidr': lambda ip: f"{mix_formatter(ip)}",
|
||
'ovpn': lambda ip: f'push "route {mix_formatter(ip)}"',
|
||
'mikrotik': lambda ip: f'/ip/firewall/address-list add list={mk_list_name}' + (f' comment="{comment(selected_service)}"' if mk_comment != "off" else "") + f' address={mix_formatter(ip)}',
|
||
'wireguard': lambda ip: f"{mix_formatter(ip)}"
|
||
})
|
||
|
||
if filetype.lower() in formatters:
|
||
write_file(filename, ips, formatters[filetype.lower()])
|
||
|
||
if filetype.lower() == 'keenetic bat':
|
||
return split_file_by_lines(filename, max_lines=999)
|
||
|
||
return False
|
||
|
||
async def main():
|
||
filename = "ip.txt"
|
||
cloudflare = None
|
||
subnet = None
|
||
filetype = None
|
||
gateway = None
|
||
selected_services = ["Service"]
|
||
mk_list_name = None
|
||
mk_comment = 'off'
|
||
ken_gateway = None
|
||
|
||
if not os.path.exists(filename):
|
||
print(f"\n{red(f'Ошибка: файл {filename} не найден!')}")
|
||
print(f"{yellow('Инструкция:')}")
|
||
print(f"1. Создайте файл {green(filename)} в текущей директории")
|
||
print(f"2. Добавьте в него IP-адреса (по одному на строку) или текст содержащий IP-адреса")
|
||
print(f"3. Запустите скрипт снова")
|
||
return
|
||
|
||
ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
|
||
|
||
with open(filename, 'r') as file:
|
||
ips = set()
|
||
for line in file:
|
||
found_ips = ip_pattern.findall(line)
|
||
ips.update(found_ips)
|
||
|
||
include_cloudflare = check_include_cloudflare(cloudflare)
|
||
if include_cloudflare:
|
||
cloudflare_ips = await get_cloudflare_ips()
|
||
else:
|
||
cloudflare_ips = set()
|
||
|
||
ips -= cloudflare_ips
|
||
|
||
with open(filename, 'w', encoding='utf-8') as file:
|
||
for ip in sorted(ips):
|
||
file.write(ip + '\n')
|
||
|
||
subnet = subnet_input(subnet)
|
||
if subnet != '32':
|
||
group_ips_in_subnets_optimized(filename, subnet)
|
||
|
||
file_was_split = process_file_format(filename, filetype, gateway, selected_services, mk_list_name, mk_comment, subnet, ken_gateway)
|
||
|
||
if not file_was_split:
|
||
print(f"\n{Style.BRIGHT}Результаты сохранены в файл:{Style.RESET_ALL} {filename}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|