mirror of
https://github.com/Ground-Zerro/DomainMapper.git
synced 2025-12-10 01:47:18 +07:00
spelling error
This commit is contained in:
106
utilities/verified.py
Normal file
106
utilities/verified.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import dns.resolver
|
||||
|
||||
# DNS сервера для проверки
|
||||
dns_servers = {
|
||||
'Google Public DNS': ['8.8.8.8', '8.8.4.4'],
|
||||
'Cloudflare DNS': ['1.1.1.1', '1.0.0.1'],
|
||||
'Yandex': ['77.88.8.8', '77.88.8.1']
|
||||
}
|
||||
|
||||
|
||||
# Функция для проверки домена на определенном DNS-сервере
|
||||
def check_domain(domain, resolver):
|
||||
try:
|
||||
answers = resolver.resolve(domain, 'A')
|
||||
if answers:
|
||||
return domain, 'Делегирован'
|
||||
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
|
||||
return domain, 'Припаркован или неактивен'
|
||||
except Exception as e:
|
||||
return domain, f'Ошибка: {e}'
|
||||
|
||||
|
||||
# Асинхронная функция для проверки домена с использованием всех DNS-серверов
|
||||
async def verify_domain_async(domain, dns_servers):
|
||||
loop = asyncio.get_running_loop()
|
||||
unverified_domains = set()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=40) as executor:
|
||||
futures = []
|
||||
for dns_name, servers in dns_servers.items():
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = servers
|
||||
|
||||
# Добавляем задачи для каждого домена с каждым DNS-сервером
|
||||
futures.append(loop.run_in_executor(executor, check_domain, domain, resolver))
|
||||
|
||||
results = await asyncio.gather(*futures)
|
||||
|
||||
# Фильтруем результаты и записываем во множество unverified_domains домены с ошибками или не делегированные
|
||||
delegated = False
|
||||
for domain, status in results:
|
||||
if status == 'Делегирован':
|
||||
delegated = True
|
||||
print(f"{domain} {status}.")
|
||||
break # Если домен делегирован хотя бы на одном сервере, прекращаем проверку
|
||||
else:
|
||||
print(f"{domain} {status}.")
|
||||
unverified_domains.add(domain)
|
||||
|
||||
# Если домен не был подтвержден как делегированный, записываем его в список для повторной проверки
|
||||
if not delegated:
|
||||
return domain, unverified_domains
|
||||
return domain, None
|
||||
|
||||
|
||||
# Функция для проверки всех доменов
|
||||
async def verify_all_domains(domain_list, dns_servers):
|
||||
unverified_domains_set = set()
|
||||
verified_domains = set()
|
||||
|
||||
print("Код запущен, но консоль может долго молчать. Терпи...")
|
||||
|
||||
# Асинхронная проверка всех доменов в первом прогоне
|
||||
tasks = [verify_domain_async(domain, dns_servers) for domain in domain_list]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# Обрабатываем результаты первого прогона
|
||||
for domain, unverified in results:
|
||||
if unverified:
|
||||
unverified_domains_set.update(unverified)
|
||||
else:
|
||||
verified_domains.add(domain)
|
||||
|
||||
# Повторная проверка для unverified_domains_set
|
||||
if unverified_domains_set:
|
||||
print("\nЗапуск контрольной проверки неактивных доменов...\n")
|
||||
second_check_tasks = [verify_domain_async(domain, dns_servers) for domain in unverified_domains_set]
|
||||
second_check_results = await asyncio.gather(*second_check_tasks)
|
||||
|
||||
# Обрабатываем результаты второго прогона
|
||||
for domain, unverified in second_check_results:
|
||||
if not unverified:
|
||||
verified_domains.add(domain)
|
||||
|
||||
return verified_domains
|
||||
|
||||
|
||||
# Чтение доменов из файла result.txt
|
||||
with open('result.txt', 'r') as file:
|
||||
domain_list = [line.strip() for line in file.readlines()]
|
||||
|
||||
# Запуск асинхронной проверки доменов
|
||||
verified_domains = asyncio.run(verify_all_domains(domain_list, dns_servers))
|
||||
|
||||
# Преобразование списка проверенных доменов в множество для исключения дубликатов и сортировка
|
||||
unique_sorted_domains = sorted(set(verified_domains))
|
||||
|
||||
# Запись результатов в новый файл
|
||||
with open('verified_domains.txt', 'w') as file:
|
||||
for domain in unique_sorted_domains:
|
||||
file.write(f"{domain}\n")
|
||||
|
||||
print(f"Проверенные домены сохранены в verified_domains.txt.\n Найдено {len(unique_sorted_domains)} уникальных активных доменов.")
|
||||
Reference in New Issue
Block a user