Files
DomainMapper/utilities/subdomain.py
2024-09-15 17:53:46 +11:00

117 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from bs4 import BeautifulSoup
def parse_page(url):
for attempt in range(5): # До 5 попыток для одной страницы
try:
response = requests.get(url)
if response.status_code == 404: # Проверка на несуществующую страницу
return None
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
data = set() # Используем множество для уникальных доменов
rows = soup.select('table tbody tr')
if not rows: # Если на странице нет строк, возвращаем None
return None
for row in rows:
columns = row.find_all('td')
if len(columns) > 3 and columns[2].text.strip() == 'A': # Проверка на тип записи 'A'
domain = columns[0].text.strip() # Извлечение столбца 'Domain'
data.add(domain) # Добавляем в множество
time.sleep(random.uniform(1, 3)) # Задержка между запросами
return data
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
print(f"Ошибка загрузки {url}. Пробуем еще раз...")
time.sleep(3) # Фиксированная задержка перед повторной попыткой
else:
raise e
def parse_all_pages(base_url):
all_domains = set() # Используем множество для уникальных доменов
page = 1 # Всегда начинаем с первой страницы
keep_parsing = True
empty_page_attempts = 0 # Счётчик пустых страниц
recent_pages_data = [] # Список для хранения данных последних страниц
while keep_parsing:
print(f"Парсим страницы с {page} по {page + 2}")
pages = [f"{base_url}?page={p}" for p in range(page, page + 3)]
try:
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(parse_page, url): url for url in pages}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
if result is None: # Если страница пуста или не существует
print(f"Страница {url.split('=')[-1]} не существует или пуста. Проверяем еще раз...")
empty_page_attempts += 1
time.sleep(3) # Ожидание перед повторной проверкой
if empty_page_attempts >= 3:
print(f"Страница {url.split('=')[-1]} пуста после 3 попыток. Остановка.")
keep_parsing = False
break
else:
continue # Переходим к следующей попытке
else:
empty_page_attempts = 0 # Обнуляем счётчик, если нашли данные
all_domains.update(result) # Добавляем новые домены в множество
print(f"Разбор {url} завершен.")
# Добавляем данные страницы в список для сравнения
recent_pages_data.append(result)
if len(recent_pages_data) > 3: # Храним данные только последних 3 страниц
recent_pages_data.pop(0)
# Проверяем, повторяются ли данные на последних трёх страницах
if len(recent_pages_data) == 3 and recent_pages_data[0] == recent_pages_data[1] == recent_pages_data[2]:
print(f"Данные на последних трёх страницах одинаковы. Остановка парсинга.")
keep_parsing = False
break
except Exception as e:
print(f"Ошибка парсинга {url}: {e}")
raise e
except requests.exceptions.HTTPError as e:
if '429' in str(e):
print("Ошибка 429. Пауза 4 секунды.")
time.sleep(3) # Пауза 3 секунды при ошибке 429
else:
raise e # Пробрасываем другие ошибки, если они не 429
page += 3 # Переход к следующему набору страниц
return all_domains
def get_subdomain_url():
base_url = 'https://rapiddns.io/subdomain/{url}'
url = input("Введите URL: ")
full_url = base_url.format(url=url)
return full_url # Возвращаем полный URL
base_url = get_subdomain_url() # Вызов функции для получения полного URL
domains = parse_all_pages(base_url)
# Запись результата в файл
with open('result.txt', 'w') as file:
for domain in sorted(domains): # Сортируем домены перед записью
file.write(f"{domain}\n")
print(f"Найдено {len(domains)} A записей. \nРезультаты сохранены в result.txt.")