Initial commit
This commit is contained in:
140
bot.py
Normal file
140
bot.py
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
import logging
|
||||||
|
logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s',
|
||||||
|
level=logging.INFO)
|
||||||
|
|
||||||
|
from telethon import TelegramClient, events
|
||||||
|
from telethon.tl.types import InputChannel
|
||||||
|
from telethon.errors import FloodWaitError
|
||||||
|
|
||||||
|
import asyncio, json, time, sys, yaml, os, time
|
||||||
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
from datetime import (
|
||||||
|
datetime,
|
||||||
|
timezone,
|
||||||
|
timedelta
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ap scheduler options
|
||||||
|
job_defaults = {
|
||||||
|
'coalesce': True,
|
||||||
|
'max_instances': 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# Start Scheduler
|
||||||
|
scheduler = AsyncIOScheduler(job_defaults=job_defaults)
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
# Printing download progress
|
||||||
|
def callback(current, total):
|
||||||
|
print('Downloaded', current, 'out of', total,
|
||||||
|
'bytes: {:.2%}'.format(current / total))
|
||||||
|
|
||||||
|
async def _indexer(client, config):
|
||||||
|
input_channels_entities = []
|
||||||
|
output_channel_entities = []
|
||||||
|
|
||||||
|
# loop through user dialogs
|
||||||
|
async for dialog in client.iter_dialogs():
|
||||||
|
if dialog.name in config["input_channel_names"]:
|
||||||
|
logging.info(f"Adding {dialog.name} to input channels.")
|
||||||
|
input_channels_entities.append(dialog.entity.id)
|
||||||
|
elif dialog.name in config["output_channel_names"]:
|
||||||
|
logging.info(f"Adding {dialog.name} to output channels.")
|
||||||
|
output_channel_entities.append(dialog.entity.id)
|
||||||
|
|
||||||
|
if not output_channel_entities:
|
||||||
|
logging.error("Could not find any output channels in the user's dialogs")
|
||||||
|
sys.exit(1)
|
||||||
|
elif not input_channels_entities:
|
||||||
|
logging.error("Could not find any input channels in the user's dialogs")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
logging.info(f"Listening on {len(input_channels_entities)} channels. Mirroring messages to {len(output_channel_entities)} channels.")
|
||||||
|
|
||||||
|
size = errors = 0
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
for input_channel in input_channels_entities:
|
||||||
|
async for message in client.iter_messages(input_channel, limit=None, likewhoareverse=True, wait_time=10):
|
||||||
|
try:
|
||||||
|
for output_channel in output_channel_entities:
|
||||||
|
# Process attachments
|
||||||
|
if message.media is not None:
|
||||||
|
os.makedirs('usermedia', exist_ok=True)
|
||||||
|
|
||||||
|
path = await client.download_media(message.media, file='usermedia/', progress_callback=callback)
|
||||||
|
if path is not None:
|
||||||
|
logging.info(f'Sending file to {output_channel}.')
|
||||||
|
await client.send_file(
|
||||||
|
output_channel,
|
||||||
|
path,
|
||||||
|
caption=message.text
|
||||||
|
)
|
||||||
|
|
||||||
|
# Remove file after upload
|
||||||
|
os.remove(path)
|
||||||
|
else:
|
||||||
|
if message.text:
|
||||||
|
logging.info(f'Sending message to {output_channel}..')
|
||||||
|
|
||||||
|
await client.send_message(
|
||||||
|
output_channel,
|
||||||
|
message.text
|
||||||
|
)
|
||||||
|
elif message.text:
|
||||||
|
logging.info(f'Sending message to {output_channel}.')
|
||||||
|
|
||||||
|
await client.send_message(
|
||||||
|
output_channel,
|
||||||
|
message.text
|
||||||
|
)
|
||||||
|
except FloodWaitError as e:
|
||||||
|
logging.error(f'Flood wait for {e.seconds} for _indexer.')
|
||||||
|
|
||||||
|
await asyncio.sleep(e.seconds)
|
||||||
|
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f'Error occurred while processing message on {input_channel}.\n Error: {e}')
|
||||||
|
|
||||||
|
continue
|
||||||
|
except ValueError as e:
|
||||||
|
logging.error(f'Error:\n{e}')
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
# logging.info("Sleeping for 21 seconds.")
|
||||||
|
# asyncio.sleep(21)
|
||||||
|
|
||||||
|
logging.info(f'Processed {size} messages in {time.time() - start_time} seconds')
|
||||||
|
logging.info(f' with a total of {errors} errors\n')
|
||||||
|
logging.info(f'..: Indexing completed for {input_channel} :..')
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print(f"Usage: {sys.argv[0]} {{CONFIG_PATH}}")
|
||||||
|
sys.exit(1)
|
||||||
|
with open(sys.argv[1], 'rb') as f:
|
||||||
|
config = yaml.safe_load(f)
|
||||||
|
|
||||||
|
client = TelegramClient('anon',
|
||||||
|
config["api_id"],
|
||||||
|
config["api_hash"])
|
||||||
|
|
||||||
|
await client.start()
|
||||||
|
|
||||||
|
# Initial job run 1 minutes after execution of bot
|
||||||
|
scheduler.add_job(
|
||||||
|
_indexer,
|
||||||
|
'date',
|
||||||
|
args=[client, config],
|
||||||
|
run_date=datetime.now(tz=timezone.utc) + timedelta(seconds=1*60),
|
||||||
|
id="run_once_indexer"
|
||||||
|
)
|
||||||
|
|
||||||
|
scheduler.start()
|
||||||
|
|
||||||
|
await client.run_until_disconnected()
|
||||||
|
|
||||||
|
loop.run_until_complete(main())
|
||||||
Reference in New Issue
Block a user