Files
telegram_groupmirror/bot.py
2023-12-27 00:12:44 -05:00

140 lines
4.9 KiB
Python

import logging
logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s',
level=logging.INFO)
from telethon import TelegramClient, events
from telethon.tl.types import InputChannel
from telethon.errors import FloodWaitError
import asyncio, json, time, sys, yaml, os, time
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import (
datetime,
timezone,
timedelta
)
# Ap scheduler options
job_defaults = {
'coalesce': True,
'max_instances': 1
}
# Start Scheduler
scheduler = AsyncIOScheduler(job_defaults=job_defaults)
loop = asyncio.get_event_loop()
# Printing download progress
def callback(current, total):
print('Downloaded', current, 'out of', total,
'bytes: {:.2%}'.format(current / total))
async def _indexer(client, config):
input_channels_entities = []
output_channel_entities = []
# loop through user dialogs
async for dialog in client.iter_dialogs():
if dialog.name in config["input_channel_names"]:
logging.info(f"Adding {dialog.name} to input channels.")
input_channels_entities.append(dialog.entity.id)
elif dialog.name in config["output_channel_names"]:
logging.info(f"Adding {dialog.name} to output channels.")
output_channel_entities.append(dialog.entity.id)
if not output_channel_entities:
logging.error("Could not find any output channels in the user's dialogs")
sys.exit(1)
elif not input_channels_entities:
logging.error("Could not find any input channels in the user's dialogs")
sys.exit(1)
logging.info(f"Listening on {len(input_channels_entities)} channels. Mirroring messages to {len(output_channel_entities)} channels.")
size = errors = 0
start_time = time.time()
for input_channel in input_channels_entities:
async for message in client.iter_messages(input_channel, limit=None, reverse=True, wait_time=10):
try:
for output_channel in output_channel_entities:
# Process attachments
if message.media is not None:
os.makedirs('usermedia', exist_ok=True)
path = await client.download_media(message.media, file='usermedia/', progress_callback=callback)
if path is not None:
logging.info(f'Sending file to {output_channel}.')
await client.send_file(
output_channel,
path,
caption=message.text
)
# Remove file after upload
os.remove(path)
else:
if message.text:
logging.info(f'Sending message to {output_channel}..')
await client.send_message(
output_channel,
message.text
)
elif message.text:
logging.info(f'Sending message to {output_channel}.')
await client.send_message(
output_channel,
message.text
)
except FloodWaitError as e:
logging.error(f'Flood wait for {e.seconds} for _indexer.')
await asyncio.sleep(e.seconds)
continue
except Exception as e:
logging.error(f'Error occurred while processing message on {input_channel}.\n Error: {e}')
continue
except ValueError as e:
logging.error(f'Error:\n{e}')
continue
# logging.info("Sleeping for 21 seconds.")
# asyncio.sleep(21)
logging.info(f'Processed {size} messages in {time.time() - start_time} seconds')
logging.info(f' with a total of {errors} errors\n')
logging.info(f'..: Indexing completed for {input_channel} :..')
async def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} {{CONFIG_PATH}}")
sys.exit(1)
with open(sys.argv[1], 'rb') as f:
config = yaml.safe_load(f)
client = TelegramClient('anon',
config["api_id"],
config["api_hash"])
await client.start()
# Initial job run 1 minutes after execution of bot
scheduler.add_job(
_indexer,
'date',
args=[client, config],
run_date=datetime.now(tz=timezone.utc) + timedelta(seconds=1*60),
id="run_once_indexer"
)
scheduler.start()
await client.run_until_disconnected()
loop.run_until_complete(main())