1#!/usr/bin/env python
2# This program is dedicated to the public domain under the CC0 license.
3# pylint: disable=import-error,wrong-import-position
4"""
5Simple example of a bot that uses a custom webhook setup and handles custom updates.
6For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
7them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
8Note that any other `asyncio` based web server framework can be used for a custom webhook setup
9just as well.
10
11Usage:
12Set bot token, url, admin chat_id and port at the start of the `main` function.
13You may also need to change the `listen` value in the uvicorn configuration to match your setup.
14Press Ctrl-C on the command line or send a signal to the process to stop the bot.
15"""
16import asyncio
17import html
18import logging
19from dataclasses import dataclass
20from http import HTTPStatus
21
22import uvicorn
23from starlette.applications import Starlette
24from starlette.requests import Request
25from starlette.responses import PlainTextResponse, Response
26from starlette.routing import Route
27
28from telegram import __version__ as TG_VER
29
30try:
31 from telegram import __version_info__
32except ImportError:
33 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
34
35if __version_info__ < (20, 0, 0, "alpha", 1):
36 raise RuntimeError(
37 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
38 f"{TG_VER} version of this example, "
39 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
40 )
41
42from telegram import Update
43from telegram.constants import ParseMode
44from telegram.ext import (
45 Application,
46 CallbackContext,
47 CommandHandler,
48 ContextTypes,
49 ExtBot,
50 TypeHandler,
51)
52
53# Enable logging
54logging.basicConfig(
55 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
56)
57logger = logging.getLogger(__name__)
58
59
60@dataclass
61class WebhookUpdate:
62 """Simple dataclass to wrap a custom update type"""
63
64 user_id: int
65 payload: str
66
67
68class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
69 """
70 Custom CallbackContext class that makes `user_data` available for updates of type
71 `WebhookUpdate`.
72 """
73
74 @classmethod
75 def from_update(
76 cls,
77 update: object,
78 application: "Application",
79 ) -> "CustomContext":
80 if isinstance(update, WebhookUpdate):
81 return cls(application=application, user_id=update.user_id)
82 return super().from_update(update, application)
83
84
85async def start(update: Update, context: CustomContext) -> None:
86 """Display a message with instructions on how to use this bot."""
87 url = context.bot_data["url"]
88 payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
89 text = (
90 f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
91 f"To post a custom update, call <code>{payload_url}</code>."
92 )
93 await update.message.reply_html(text=text)
94
95
96async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
97 """Callback that handles the custom updates."""
98 chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
99 payloads = context.user_data.setdefault("payloads", [])
100 payloads.append(update.payload)
101 combined_payloads = "</code>\n• <code>".join(payloads)
102 text = (
103 f"The user {chat_member.user.mention_html()} has sent a new payload. "
104 f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
105 )
106 await context.bot.send_message(
107 chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
108 )
109
110
111async def main() -> None:
112 """Set up the application and a custom webserver."""
113 url = "https://domain.tld"
114 admin_chat_id = 123456
115 port = 8000
116
117 context_types = ContextTypes(context=CustomContext)
118 # Here we set updater to None because we want our custom webhook server to handle the updates
119 # and hence we don't need an Updater instance
120 application = (
121 Application.builder().token("TOKEN").updater(None).context_types(context_types).build()
122 )
123 # save the values in `bot_data` such that we may easily access them in the callbacks
124 application.bot_data["url"] = url
125 application.bot_data["admin_chat_id"] = admin_chat_id
126
127 # register handlers
128 application.add_handler(CommandHandler("start", start))
129 application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
130
131 # Pass webhook settings to telegram
132 await application.bot.set_webhook(url=f"{url}/telegram")
133
134 # Set up webserver
135 async def telegram(request: Request) -> Response:
136 """Handle incoming Telegram updates by putting them into the `update_queue`"""
137 await application.update_queue.put(
138 Update.de_json(data=await request.json(), bot=application.bot)
139 )
140 return Response()
141
142 async def custom_updates(request: Request) -> PlainTextResponse:
143 """
144 Handle incoming webhook updates by also putting them into the `update_queue` if
145 the required parameters were passed correctly.
146 """
147 try:
148 user_id = int(request.query_params["user_id"])
149 payload = request.query_params["payload"]
150 except KeyError:
151 return PlainTextResponse(
152 status_code=HTTPStatus.BAD_REQUEST,
153 content="Please pass both `user_id` and `payload` as query parameters.",
154 )
155 except ValueError:
156 return PlainTextResponse(
157 status_code=HTTPStatus.BAD_REQUEST,
158 content="The `user_id` must be a string!",
159 )
160
161 await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
162 return PlainTextResponse("Thank you for the submission! It's being forwarded.")
163
164 async def health(_: Request) -> PlainTextResponse:
165 """For the health endpoint, reply with a simple plain text message."""
166 return PlainTextResponse(content="The bot is still running fine :)")
167
168 starlette_app = Starlette(
169 routes=[
170 Route("/telegram", telegram, methods=["POST"]),
171 Route("/healthcheck", health, methods=["GET"]),
172 Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
173 ]
174 )
175 webserver = uvicorn.Server(
176 config=uvicorn.Config(
177 app=starlette_app,
178 port=port,
179 use_colors=False,
180 host="127.0.0.1",
181 )
182 )
183
184 # Run application and webserver together
185 async with application:
186 await application.start()
187 await webserver.serve()
188 await application.stop()
189
190
191if __name__ == "__main__":
192 asyncio.run(main())