вспомогательные утилиты

This commit is contained in:
Ground-Zerro
2024-09-15 17:47:18 +11:00
parent f722de829a
commit 5371affec9
3 changed files with 283 additions and 0 deletions

63
utilities/README.md Normal file
View File

@@ -0,0 +1,63 @@
# Вспомогательные утилиты:
- [subdomain - поиск субдоменов](#subdomain)
- [verified - проверка активности доменов](#verified)
## subdomain
Скрипт представляет собой парсер, который собирает субдомены (A-записи) указанного пользователем домена используя веб-сайт [rapiddns.io](https://rapiddns.io/subdomain/).
Результаты сохраняются в файл.
### Функции
- Загружает страницу по указанному URL и извлекает домены из таблиц, где тип записи равен "A". Пытается повторить запрос до 5 раз в случае ошибки или отсутствия данных.
- Отправляет запросы к страницам, начиная с первой, и обрабатывает до трех страниц одновременно. Останавливается, если данные на последних трех страницах одинаковы или если три страницы подряд пустые.
### Использование
1. Установите зависимости:
```bash
pip install -r requirements.txt
```
2. Запустите скрипт:
```bash
python subdomain.py
```
3. Введите URL домена, поддомены которого вы хотите спарсить, например:
```
example.com
```
4. Скрипт начнет парсинг страниц и сохранит найденные субдомены в файл `result.txt`.
## verified
Скрипт предназначен для проверки доменов на их делегированность.
### Функции
- Проверяет домены используя DNS-серверы: Google Public DNS, Cloudflare DNS и Yandex. Пул потоков ограничен 40 рабочими потоками.
- Возвращает статус домена (делегирован, припаркован/неактивен) или ошибку.
- Если статус домена не был подтвержден как делегированный проводит его контрольную проверку.
### Использование
1. Установите зависимости:
```bash
pip install dnspython
```
2. Поместите файл `result.txt` в корневую директорию проекта. Файл должен содержать список доменов, каждый на новой строке.
3. Запустите скрипт:
```bash
python verified.py
```
4. Скрипт проверит домены и сохранит результат в файл `verified_domains.txt`.

115
utilities/subdomain.py Normal file
View File

@@ -0,0 +1,115 @@
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def parse_page(url):
for attempt in range(5): # До 5 попыток для одной страницы
try:
response = requests.get(url)
if response.status_code == 404: # Проверка на несуществующую страницу
return None
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
data = set() # Используем множество для уникальных доменов
rows = soup.select('table tbody tr')
if not rows: # Если на странице нет строк, возвращаем None
return None
for row in rows:
columns = row.find_all('td')
if len(columns) > 3 and columns[2].text.strip() == 'A': # Проверка на тип записи 'A'
domain = columns[0].text.strip() # Извлечение столбца 'Domain'
data.add(domain) # Добавляем в множество
time.sleep(random.uniform(1, 3)) # Задержка между запросами
return data
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
print(f"Ошибка загрузки {url}. Пробуем еще раз...")
time.sleep(3) # Фиксированная задержка перед повторной попыткой
else:
raise e
def parse_all_pages(base_url):
all_domains = set() # Используем множество для уникальных доменов
page = 1 # Всегда начинаем с первой страницы
keep_parsing = True
empty_page_attempts = 0 # Счётчик пустых страниц
recent_pages_data = [] # Список для хранения данных последних страниц
while keep_parsing:
print(f"Парсим страницы с {page} по {page + 2}")
pages = [f"{base_url}?page={p}" for p in range(page, page + 3)]
try:
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(parse_page, url): url for url in pages}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
if result is None: # Если страница пуста или не существует
print(f"Страница {url.split('=')[-1]} не существует или пуста. Проверяем еще раз...")
empty_page_attempts += 1
time.sleep(3) # Ожидание перед повторной проверкой
if empty_page_attempts >= 3:
print(f"Страница {url.split('=')[-1]} пуста после 3 попыток. Остановка.")
keep_parsing = False
break
else:
continue # Переходим к следующей попытке
else:
empty_page_attempts = 0 # Обнуляем счётчик, если нашли данные
all_domains.update(result) # Добавляем новые домены в множество
print(f"Разбор {url} завершен.")
# Добавляем данные страницы в список для сравнения
recent_pages_data.append(result)
if len(recent_pages_data) > 3: # Храним данные только последних 3 страниц
recent_pages_data.pop(0)
# Проверяем, повторяются ли данные на последних трёх страницах
if len(recent_pages_data) == 3 and recent_pages_data[0] == recent_pages_data[1] == recent_pages_data[2]:
print(f"Данные на последних трёх страницах одинаковы. Остановка парсинга.")
keep_parsing = False
break
except Exception as e:
print(f"Ошибка парсинга {url}: {e}")
raise e
except requests.exceptions.HTTPError as e:
if '429' in str(e):
print("Ошибка 429. Пауза 4 секунды.")
time.sleep(3) # Пауза 3 секунды при ошибке 429
else:
raise e # Пробрасываем другие ошибки, если они не 429
page += 3 # Переход к следующему набору страниц
return all_domains
def get_subdomain_url():
base_url = 'https://rapiddns.io/subdomain/{url}'
url = input("Введите URL: ")
full_url = base_url.format(url=url)
return full_url # Возвращаем полный URL
base_url = get_subdomain_url() # Вызов функции для получения полного URL
domains = parse_all_pages(base_url)
# Запись результата в файл
with open('result.txt', 'w') as file:
for domain in sorted(domains): # Сортируем домены перед записью
file.write(f"{domain}\n")
print(f"Найдено {len(domains)} A записей. \nРезультаты сохранены в result.txt.")

105
utilities/werified.py Normal file
View File

@@ -0,0 +1,105 @@
import asyncio
import dns.resolver
from concurrent.futures import ThreadPoolExecutor
# DNS сервера для проверки
dns_servers = {
'Google Public DNS': ['8.8.8.8', '8.8.4.4'],
'Cloudflare DNS': ['1.1.1.1', '1.0.0.1'],
'Yandex': ['77.88.8.8', '77.88.8.1']
}
# Функция для проверки домена на определенном DNS-сервере
def check_domain(domain, resolver):
try:
answers = resolver.resolve(domain, 'A')
if answers:
return domain, 'Делегирован'
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
return domain, 'Припаркован или неактивен'
except Exception as e:
return domain, f'Ошибка: {e}'
# Асинхронная функция для проверки домена с использованием всех DNS-серверов
async def verify_domain_async(domain, dns_servers):
loop = asyncio.get_running_loop()
unverified_domains = set()
with ThreadPoolExecutor(max_workers=40) as executor:
futures = []
for dns_name, servers in dns_servers.items():
resolver = dns.resolver.Resolver()
resolver.nameservers = servers
# Добавляем задачи для каждого домена с каждым DNS-сервером
futures.append(loop.run_in_executor(executor, check_domain, domain, resolver))
results = await asyncio.gather(*futures)
# Фильтруем результаты и записываем во множество unverified_domains домены с ошибками или не делегированные
delegated = False
for domain, status in results:
if status == 'Делегирован':
delegated = True
print(f"{domain} {status}.")
break # Если домен делегирован хотя бы на одном сервере, прекращаем проверку
else:
print(f"{domain} {status}.")
unverified_domains.add(domain)
# Если домен не был подтвержден как делегированный, записываем его в список для повторной проверки
if not delegated:
return domain, unverified_domains
return domain, None
# Функция для проверки всех доменов
async def verify_all_domains(domain_list, dns_servers):
unverified_domains_set = set()
verified_domains = set()
print("Код запущен, но консоль может долго молчать. Терпи...")
# Асинхронная проверка всех доменов в первом прогоне
tasks = [verify_domain_async(domain, dns_servers) for domain in domain_list]
results = await asyncio.gather(*tasks)
# Обрабатываем результаты первого прогона
for domain, unverified in results:
if unverified:
unverified_domains_set.update(unverified)
else:
verified_domains.add(domain)
# Повторная проверка для unverified_domains_set
if unverified_domains_set:
print("\nЗапуск контрольной проверки неактивных доменов...\n")
second_check_tasks = [verify_domain_async(domain, dns_servers) for domain in unverified_domains_set]
second_check_results = await asyncio.gather(*second_check_tasks)
# Обрабатываем результаты второго прогона
for domain, unverified in second_check_results:
if not unverified:
verified_domains.add(domain)
return verified_domains
# Чтение доменов из файла result.txt
with open('result.txt', 'r') as file:
domain_list = [line.strip() for line in file.readlines()]
# Запуск асинхронной проверки доменов
verified_domains = asyncio.run(verify_all_domains(domain_list, dns_servers))
# Преобразование списка проверенных доменов в множество для исключения дубликатов и сортировка
unique_sorted_domains = sorted(set(verified_domains))
# Запись результатов в новый файл
with open('verified_domains.txt', 'w') as file:
for domain in unique_sorted_domains:
file.write(f"{domain}\n")
print(f"Проверенные домены сохранены в verified_domains.txt.\n Найдено {len(unique_sorted_domains)} уникальных активных доменов.")