157 lines
5.5 KiB
Python
157 lines
5.5 KiB
Python
import logging
|
|
logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s',
|
|
level=logging.INFO)
|
|
|
|
from telethon import TelegramClient, events
|
|
from telethon.tl.types import InputChannel
|
|
from telethon.errors import FloodWaitError
|
|
|
|
import asyncio, json, time, sys, yaml, os, time, re
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from datetime import (
|
|
datetime,
|
|
timezone,
|
|
timedelta
|
|
)
|
|
|
|
# Ap scheduler options
|
|
job_defaults = {
|
|
'coalesce': True,
|
|
'max_instances': 1
|
|
}
|
|
|
|
# Regexp patterns to remove from group
|
|
patterns = [
|
|
'https:\/\/t.me\/\w*',
|
|
'@[Ww]hite[Aa]ction'
|
|
]
|
|
|
|
combined_patterns = r'|'.join(map(r'(?:{})'.format, patterns))
|
|
|
|
# Start Scheduler
|
|
scheduler = AsyncIOScheduler(job_defaults=job_defaults)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Printing download progress
|
|
def callback(current, total):
|
|
print('Downloaded', current, 'out of', total,
|
|
'bytes: {:.2%}'.format(current / total))
|
|
|
|
async def _indexer(client, config):
|
|
input_channels_entities = []
|
|
output_channel_entities = []
|
|
|
|
# loop through user dialogs
|
|
async for dialog in client.iter_dialogs():
|
|
if dialog.name in config["input_channel_names"]:
|
|
logging.info(f"Adding {dialog.name} to input channels.")
|
|
input_channels_entities.append(dialog.entity.id)
|
|
elif dialog.name in config["output_channel_names"]:
|
|
logging.info(f"Adding {dialog.name} to output channels.")
|
|
output_channel_entities.append(dialog.entity.id)
|
|
|
|
if not output_channel_entities:
|
|
logging.error("Could not find any output channels in the user's dialogs")
|
|
sys.exit(1)
|
|
elif not input_channels_entities:
|
|
logging.error("Could not find any input channels in the user's dialogs")
|
|
sys.exit(1)
|
|
|
|
logging.info(f"Listening on {len(input_channels_entities)} channels. Mirroring messages to {len(output_channel_entities)} channels.")
|
|
|
|
size = errors = 0
|
|
start_time = time.time()
|
|
|
|
for input_channel in input_channels_entities:
|
|
async for message in client.iter_messages(input_channel, limit=None, reverse=True, wait_time=10):
|
|
try:
|
|
for output_channel in output_channel_entities:
|
|
# Process attachments
|
|
if message.media is not None:
|
|
os.makedirs('usermedia', exist_ok=True)
|
|
|
|
path = await client.download_media(message.media, file='usermedia/', progress_callback=callback)
|
|
if path is not None:
|
|
if message.text:
|
|
logging.info(f'Sending file to {output_channel}.')
|
|
txt = re.sub(combined_patterns, '', message.text)
|
|
|
|
await client.send_file(
|
|
output_channel,
|
|
path,
|
|
caption=txt
|
|
)
|
|
else:
|
|
await client.send_file(
|
|
output_channel,
|
|
path
|
|
)
|
|
|
|
# Remove file after upload
|
|
os.remove(path)
|
|
elif message.text:
|
|
logging.info(f'Sending message to {output_channel}..')
|
|
txt = re.sub(combined_patterns, '', message.text)
|
|
|
|
await client.send_message(
|
|
output_channel,
|
|
txt
|
|
)
|
|
elif message.text:
|
|
logging.info(f'Sending message to {output_channel}.')
|
|
txt = re.sub(combined_patterns, '', message.text)
|
|
|
|
await client.send_message(
|
|
output_channel,
|
|
txt
|
|
)
|
|
except FloodWaitError as e:
|
|
logging.error(f'Flood wait for {e.seconds} for _indexer.')
|
|
|
|
await asyncio.sleep(e.seconds)
|
|
|
|
continue
|
|
except Exception as e:
|
|
logging.error(f'Error occurred while processing message on {input_channel}.\n Error: {e}')
|
|
|
|
continue
|
|
except ValueError as e:
|
|
logging.error(f'Error:\n{e}')
|
|
|
|
continue
|
|
|
|
# logging.info("Sleeping for 21 seconds.")
|
|
# asyncio.sleep(21)
|
|
|
|
logging.info(f'Processed {size} messages in {time.time() - start_time} seconds')
|
|
logging.info(f' with a total of {errors} errors\n')
|
|
logging.info(f'..: Indexing completed for {input_channel} :..')
|
|
|
|
async def main():
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} {{CONFIG_PATH}}")
|
|
sys.exit(1)
|
|
with open(sys.argv[1], 'rb') as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
client = TelegramClient('anon',
|
|
config["api_id"],
|
|
config["api_hash"])
|
|
|
|
await client.start()
|
|
|
|
# Initial job run 1 minutes after execution of bot
|
|
scheduler.add_job(
|
|
_indexer,
|
|
'date',
|
|
args=[client, config],
|
|
run_date=datetime.now(tz=timezone.utc) + timedelta(seconds=1*60),
|
|
id="run_once_indexer"
|
|
)
|
|
|
|
scheduler.start()
|
|
|
|
await client.run_until_disconnected()
|
|
|
|
loop.run_until_complete(main()) |