diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..0cadb3d --- /dev/null +++ b/bot.py @@ -0,0 +1,140 @@ +import logging +logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s', + level=logging.INFO) + +from telethon import TelegramClient, events +from telethon.tl.types import InputChannel +from telethon.errors import FloodWaitError + +import asyncio, json, time, sys, yaml, os, time +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from datetime import ( + datetime, + timezone, + timedelta +) + +# Ap scheduler options +job_defaults = { + 'coalesce': True, + 'max_instances': 1 +} + +# Start Scheduler +scheduler = AsyncIOScheduler(job_defaults=job_defaults) + +loop = asyncio.get_event_loop() + +# Printing download progress +def callback(current, total): + print('Downloaded', current, 'out of', total, + 'bytes: {:.2%}'.format(current / total)) + +async def _indexer(client, config): + input_channels_entities = [] + output_channel_entities = [] + + # loop through user dialogs + async for dialog in client.iter_dialogs(): + if dialog.name in config["input_channel_names"]: + logging.info(f"Adding {dialog.name} to input channels.") + input_channels_entities.append(dialog.entity.id) + elif dialog.name in config["output_channel_names"]: + logging.info(f"Adding {dialog.name} to output channels.") + output_channel_entities.append(dialog.entity.id) + + if not output_channel_entities: + logging.error("Could not find any output channels in the user's dialogs") + sys.exit(1) + elif not input_channels_entities: + logging.error("Could not find any input channels in the user's dialogs") + sys.exit(1) + + logging.info(f"Listening on {len(input_channels_entities)} channels. Mirroring messages to {len(output_channel_entities)} channels.") + + size = errors = 0 + start_time = time.time() + + for input_channel in input_channels_entities: + async for message in client.iter_messages(input_channel, limit=None, likewhoareverse=True, wait_time=10): + try: + for output_channel in output_channel_entities: + # Process attachments + if message.media is not None: + os.makedirs('usermedia', exist_ok=True) + + path = await client.download_media(message.media, file='usermedia/', progress_callback=callback) + if path is not None: + logging.info(f'Sending file to {output_channel}.') + await client.send_file( + output_channel, + path, + caption=message.text + ) + + # Remove file after upload + os.remove(path) + else: + if message.text: + logging.info(f'Sending message to {output_channel}..') + + await client.send_message( + output_channel, + message.text + ) + elif message.text: + logging.info(f'Sending message to {output_channel}.') + + await client.send_message( + output_channel, + message.text + ) + except FloodWaitError as e: + logging.error(f'Flood wait for {e.seconds} for _indexer.') + + await asyncio.sleep(e.seconds) + + continue + except Exception as e: + logging.error(f'Error occurred while processing message on {input_channel}.\n Error: {e}') + + continue + except ValueError as e: + logging.error(f'Error:\n{e}') + + continue + + # logging.info("Sleeping for 21 seconds.") + # asyncio.sleep(21) + + logging.info(f'Processed {size} messages in {time.time() - start_time} seconds') + logging.info(f' with a total of {errors} errors\n') + logging.info(f'..: Indexing completed for {input_channel} :..') + +async def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} {{CONFIG_PATH}}") + sys.exit(1) + with open(sys.argv[1], 'rb') as f: + config = yaml.safe_load(f) + + client = TelegramClient('anon', + config["api_id"], + config["api_hash"]) + + await client.start() + + # Initial job run 1 minutes after execution of bot + scheduler.add_job( + _indexer, + 'date', + args=[client, config], + run_date=datetime.now(tz=timezone.utc) + timedelta(seconds=1*60), + id="run_once_indexer" + ) + + scheduler.start() + + await client.run_until_disconnected() + +loop.run_until_complete(main()) \ No newline at end of file