Telebot V1
Script dan snippet: Telebot V1 — tersimpan di openpaste.cloud
Telegram bot ambil data dari aws dan pixeldrain
import os
import boto3
import requests
from pyrogram import Client, filters
from pyrogram.types import Message
# == KONFIGURASI ==
API_ID = 35781188 # dari my.telegram.org
API_HASH = ""
TOKEN = "8780676399:" # dari BotFather
OWNER_ID =
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = "/9"
AWS_REGION = "us-east-1" # Ganti region kamu
S3_BUCKET = "man-logs"
# =================
DOWNLOAD_DIR = "./temp"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
s3 = boto3.client(
"s3",
aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY,
region_name = AWS_REGION
)
# Pakai bot token di pyrogram
app = Client(
"bot_storage",
api_id = API_ID,
api_hash = API_HASH,
bot_token= TOKEN
)
def is_owner(message: Message) -> bool:
return message.from_user.id == OWNER_ID
# Progress callback
async def progress(current, total, msg, text):
persen = (current / total) * 100
try:
await msg.edit_text(
f"{text}\n"
f"Progress: {persen:.1f}%\n"
f"{current/(1024*1024):.1f} MB / {total/(1024*1024):.1f} MB"
)
except:
pass
@app.on_message(filters.command("start") & filters.user(OWNER_ID))
async def start(client, message: Message):
await message.reply(
"✅ Bot Storage Aktif!\n\n"
"/s3list - List file S3\n"
"/s3get - Ambil file S3\n"
"/s3videos - List video S3\n"
"/pdget - Ambil dari Pixeldrain"
)
@app.on_message(filters.command("s3list") & filters.user(OWNER_ID))
async def s3_list(client, message: Message):
args = message.text.split()
prefix = args[1] if len(args) > 1 else ""
msg = await message.reply("⏳ Mengambil list...")
try:
all_files = []
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=prefix)
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
size_mb = obj["Size"] / (1024 * 1024)
all_files.append(f"• `{obj['Key']}` ({size_mb:.1f} MB)")
if not all_files:
await msg.edit("📂 Kosong")
return
header = f"📦 {S3_BUCKET} | Total: {len(all_files)} file\n\n"
chunks = [all_files[i:i+30] for i in range(0, len(all_files), 30)]
for i, chunk in enumerate(chunks):
text = (header if i == 0 else "") + "\n".join(chunk)
if i == 0:
await msg.edit(text)
else:
await message.reply(text)
except Exception as e:
await msg.edit(f"❌ Error: {str(e)}")
@app.on_message(filters.command("s3get") & filters.user(OWNER_ID))
async def s3_get(client, message: Message):
args = message.text.split(None, 1)
if len(args) < 2:
await message.reply("Format: /s3get namafile.mp4")
return
s3_key = args[1].strip()
filename = s3_key.split("/")[-1]
filepath = os.path.join(DOWNLOAD_DIR, filename)
msg = await message.reply(f"⏳ Downloading: `{s3_key}`")
try:
head = s3.head_object(Bucket=S3_BUCKET, Key=s3_key)
size_mb = head["ContentLength"] / (1024 * 1024)
await msg.edit(f"⏳ Downloading {size_mb:.1f} MB dari S3...")
# Download dari S3 dengan progress
s3.download_file(S3_BUCKET, s3_key, filepath)
await msg.edit(f"📤 Uploading {size_mb:.1f} MB ke Telegram...")
ext = filename.lower().split(".")[-1]
if ext in ["mp4", "mkv", "avi", "mov", "ts"]:
await message.reply_video(
video = filepath,
caption = f"✅ {filename} ({size_mb:.1f} MB)",
progress = progress,
progress_args = (msg, "📤 Uploading...")
)
elif ext in ["mp3", "m4a", "aac"]:
await message.reply_audio(
audio = filepath,
caption = f"✅ {filename}",
progress = progress,
progress_args = (msg, "📤 Uploading...")
)
else:
await message.reply_document(
document = filepath,
caption = f"✅ {filename} ({size_mb:.1f} MB)",
progress = progress,
progress_args = (msg, "📤 Uploading...")
)
await msg.edit(f"✅ Selesai: {filename} ({size_mb:.1f} MB)")
except Exception as e:
await msg.edit(f"❌ Error: {str(e)}")
finally:
if os.path.exists(filepath):
os.remove(filepath)
@app.on_message(filters.command("pdget") & filters.user(OWNER_ID))
async def pd_get(client, message: Message):
args = message.text.split()
if len(args) < 2:
await message.reply("Format: /pdget FILEID")
return
file_id = args[1].split("/")[-1]
msg = await message.reply("⏳ Mengambil info Pixeldrain...")
try:
info = requests.get(f"https://pixeldrain.com/api/file/{file_id}/info").json()
filename = info.get("name", file_id)
size_mb = info.get("size", 0) / (1024 * 1024)
filepath = os.path.join(DOWNLOAD_DIR, filename)
await msg.edit(f"⏳ Downloading {filename} ({size_mb:.1f} MB)...")
r = requests.get(f"https://pixeldrain.com/api/file/{file_id}", stream=True)
with open(filepath, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
await msg.edit(f"📤 Uploading ke Telegram...")
ext = filename.lower().split(".")[-1]
if ext in ["mp4", "mkv", "avi", "mov", "ts"]:
await message.reply_video(
video = filepath,
caption = f"✅ {filename} ({size_mb:.1f} MB)",
progress = progress,
progress_args = (msg, "📤 Uploading...")
)
else:
await message.reply_document(
document = filepath,
caption = f"✅ {filename} ({size_mb:.1f} MB)",
progress = progress,
progress_args = (msg, "📤 Uploading...")
)
await msg.edit(f"✅ Selesai: {filename}")
except Exception as e:
await msg.edit(f"❌ Error: {str(e)}")
finally:
if os.path.exists(filepath):
os.remove(filepath)
print("✅ Bot jalan dengan Pyrogram...")
app.run()