import logging logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s', level=logging.INFO) from telethon import TelegramClient, events from telethon.tl.types import InputChannel from telethon.errors import FloodWaitError import asyncio, json, time, sys, yaml, os, time, re from apscheduler.schedulers.asyncio import AsyncIOScheduler from datetime import ( datetime, timezone, timedelta ) # Ap scheduler options job_defaults = { 'coalesce': True, 'max_instances': 1 } # Regexp patterns to remove from group patterns = [ 'https:\/\/t.me\/\w*', '@[Ww]hite[Aa]ction' ] combined_patterns = r'|'.join(map(r'(?:{})'.format, patterns)) # Start Scheduler scheduler = AsyncIOScheduler(job_defaults=job_defaults) loop = asyncio.get_event_loop() # Printing download progress def callback(current, total): print('Downloaded', current, 'out of', total, 'bytes: {:.2%}'.format(current / total)) async def _indexer(client, config): input_channels_entities = [] output_channel_entities = [] # loop through user dialogs async for dialog in client.iter_dialogs(): if dialog.name in config["input_channel_names"]: logging.info(f"Adding {dialog.name} to input channels.") input_channels_entities.append(dialog.entity.id) elif dialog.name in config["output_channel_names"]: logging.info(f"Adding {dialog.name} to output channels.") output_channel_entities.append(dialog.entity.id) if not output_channel_entities: logging.error("Could not find any output channels in the user's dialogs") sys.exit(1) elif not input_channels_entities: logging.error("Could not find any input channels in the user's dialogs") sys.exit(1) logging.info(f"Listening on {len(input_channels_entities)} channels. Mirroring messages to {len(output_channel_entities)} channels.") size = errors = 0 start_time = time.time() for input_channel in input_channels_entities: async for message in client.iter_messages(input_channel, limit=None, reverse=True, wait_time=10): try: for output_channel in output_channel_entities: # Process attachments if message.media is not None: os.makedirs('usermedia', exist_ok=True) path = await client.download_media(message.media, file='usermedia/', progress_callback=callback) if path is not None: if message.text: logging.info(f'Sending file to {output_channel}.') txt = re.sub(combined_patterns, '', message.text) await client.send_file( output_channel, path, caption=txt ) else: await client.send_file( output_channel, path ) # Remove file after upload os.remove(path) elif message.text: logging.info(f'Sending message to {output_channel}..') txt = re.sub(combined_patterns, '', message.text) await client.send_message( output_channel, txt ) elif message.text: logging.info(f'Sending message to {output_channel}.') txt = re.sub(combined_patterns, '', message.text) await client.send_message( output_channel, txt ) except FloodWaitError as e: logging.error(f'Flood wait for {e.seconds} for _indexer.') await asyncio.sleep(e.seconds) continue except Exception as e: logging.error(f'Error occurred while processing message on {input_channel}.\n Error: {e}') continue except ValueError as e: logging.error(f'Error:\n{e}') continue # logging.info("Sleeping for 21 seconds.") # asyncio.sleep(21) logging.info(f'Processed {size} messages in {time.time() - start_time} seconds') logging.info(f' with a total of {errors} errors\n') logging.info(f'..: Indexing completed for {input_channel} :..') async def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} {{CONFIG_PATH}}") sys.exit(1) with open(sys.argv[1], 'rb') as f: config = yaml.safe_load(f) client = TelegramClient('anon', config["api_id"], config["api_hash"]) await client.start() # Initial job run 1 minutes after execution of bot scheduler.add_job( _indexer, 'date', args=[client, config], run_date=datetime.now(tz=timezone.utc) + timedelta(seconds=1*60), id="run_once_indexer" ) scheduler.start() await client.run_until_disconnected() loop.run_until_complete(main())