import asyncio import os import sys from telethon import TelegramClient from telethon.tl.functions.help import GetConfigRequest API_ID = 611335 API_HASH = "d524b414d21f4d37f08684c1df41ac9c" BOT_TOKEN = os.environ.get("BOT_TOKEN") CHAT_ID = int(os.environ.get("CHAT_ID")) MESSAGE_THREAD_ID = int(os.environ.get("MESSAGE_THREAD_ID")) COMMIT_URL = os.environ.get("COMMIT_URL") COMMIT_MESSAGE = os.environ.get("COMMIT_MESSAGE") RUN_URL = os.environ.get("RUN_URL") TITLE = os.environ.get("TITLE") VERSION = os.environ.get("VERSION") MSG_TEMPLATE = """ **{title}** #ci_{version} ``` {commit_message} ``` [Commit]({commit_url}) [Workflow run]({run_url}) """.strip() def get_caption(): msg = MSG_TEMPLATE.format( title=TITLE, version=VERSION, commit_message=COMMIT_MESSAGE, commit_url=COMMIT_URL, run_url=RUN_URL, ) if len(msg) > 1024: return COMMIT_URL return msg def check_environ(): if BOT_TOKEN is None: print("[-] Invalid BOT_TOKEN") exit(1) if CHAT_ID is None: print("[-] Invalid CHAT_ID") exit(1) if COMMIT_URL is None: print("[-] Invalid COMMIT_URL") exit(1) if COMMIT_MESSAGE is None: print("[-] Invalid COMMIT_MESSAGE") exit(1) if RUN_URL is None: print("[-] Invalid RUN_URL") exit(1) if TITLE is None: print("[-] Invalid TITLE") exit(1) if VERSION is None: print("[-] Invalid VERSION") exit(1) async def main(): print("[+] Uploading to telegram") check_environ() files = sys.argv[1:] print("[+] Files:", files) if len(files) <= 0: print("[-] No files to upload") exit(1) print("[+] Logging in Telegram with bot") async with await TelegramClient(session=None, api_id=API_ID, api_hash=API_HASH).start(bot_token=BOT_TOKEN) as bot: caption = [""] * len(files) caption[-1] = get_caption() print("[+] Caption: ") print("---") print(caption) print("---") print("[+] Sending") await bot.send_file(entity=CHAT_ID, file=files, caption=caption, reply_to=MESSAGE_THREAD_ID, parse_mode="markdown") print("[+] Done!") await bot.log_out() if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: print(f"[-] An error occurred: {e}")