yt-dlp-telegram-bot/queue.go
2026-03-02 14:19:12 +08:00

546 lines
17 KiB
Go
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/tg"
)
const processStartStr = "🔍 Getting information..."
const processStr = "🔨 Processing"
const uploadStr = "☁️ Uploading"
const uploadDoneStr = "🏁 Uploading"
const errorStr = "❌ Error"
const canceledStr = "❌ Canceled"
const maxProgressPercentUpdateInterval = time.Second
const progressBarLength = 10
type DownloadQueueEntry struct {
URL string
Format string
// IsVideoMessage is true if this entry is for a video message download
IsVideoMessage bool
// VideoDocument stores the video document info for video messages
VideoDocument *tg.Document
OrigEntities tg.Entities
OrigMsgUpdate *tg.UpdateNewMessage
OrigMsg *tg.Message
FromUser *tg.PeerUser
FromGroup *tg.PeerChat
Reply *message.Builder
ReplyMsg *tg.UpdateShortSentMessage
Ctx context.Context
CtxCancel context.CancelFunc
Canceled bool
}
func (e *DownloadQueueEntry) sendTypingAction(ctx context.Context) {
}
func (e *DownloadQueueEntry) sendTypingCancelAction(ctx context.Context) {
}
func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) {
_, _ = e.Reply.Edit(e.ReplyMsg.ID).Text(ctx, s)
e.sendTypingAction(ctx)
}
type currentlyDownloadedEntryType struct {
disableProgressPercentUpdate bool
progressPercentUpdateMutex sync.Mutex
lastProgressPercentUpdateAt time.Time
lastProgressPercent int
lastDisplayedProgressPercent int
progressUpdateTimer *time.Timer
sourceCodecInfo string
progressInfo string
}
type DownloadQueue struct {
ctx context.Context
mutex sync.Mutex
entries []DownloadQueueEntry
processReqChan chan bool
currentlyDownloadedEntry currentlyDownloadedEntryType
}
func (e *DownloadQueue) getQueuePositionString(pos int) string {
return "👨‍👦‍👦 Request queued at position #" + fmt.Sprint(pos)
}
func (q *DownloadQueue) Add(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string) {
q.addEntry(ctx, entities, u, url, format, false, nil)
}
// AddVideo adds a video message download to the queue
func (q *DownloadQueue) AddVideo(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, videoDocument *tg.Document) {
q.addEntry(ctx, entities, u, "", "", true, videoDocument)
}
func (q *DownloadQueue) addEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string, isVideo bool, videoDocument *tg.Document) {
q.mutex.Lock()
var replyStr string
if len(q.entries) == 0 {
if isVideo {
replyStr = "⬇️ Downloading video..."
} else {
replyStr = processStartStr
}
} else {
fmt.Println(" queueing request at position #", len(q.entries))
replyStr = q.getQueuePositionString(len(q.entries))
}
newEntry := DownloadQueueEntry{
URL: url,
Format: format,
IsVideoMessage: isVideo,
VideoDocument: videoDocument,
OrigEntities: entities,
OrigMsgUpdate: u,
OrigMsg: u.Message.(*tg.Message),
}
newEntry.Reply = telegramSender.Reply(entities, u)
replyText, _ := newEntry.Reply.Text(ctx, replyStr)
newEntry.ReplyMsg = replyText.(*tg.UpdateShortSentMessage)
newEntry.FromUser, newEntry.FromGroup = resolveMsgSrc(newEntry.OrigMsg)
q.entries = append(q.entries, newEntry)
q.mutex.Unlock()
select {
case q.processReqChan <- true:
default:
}
}
func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) {
q.mutex.Lock()
if len(q.entries) > 0 {
q.entries[0].Canceled = true
q.entries[0].CtxCancel()
} else {
fmt.Println(" no active request to cancel")
_, _ = telegramSender.Reply(entities, u).Text(ctx, errorStr+": no active request to cancel")
}
q.mutex.Unlock()
}
func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) {
if progressPercent < 0 {
qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
if progressPercent == 0 {
qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
fmt.Print(" progress: ", progressPercent, "%\n")
qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent
}
func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent {
return
}
q.currentlyDownloadedEntry.lastProgressPercent = progressPercent
if progressPercent < 0 {
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
return
}
if q.currentlyDownloadedEntry.progressUpdateTimer != nil {
q.currentlyDownloadedEntry.progressUpdateTimer.Stop()
select {
case <-q.currentlyDownloadedEntry.progressUpdateTimer.C:
default:
}
}
timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt)
if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval {
q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
if !q.currentlyDownloadedEntry.disableProgressPercentUpdate {
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
})
return
}
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
fromUsername := getFromUsername(qEntry.OrigEntities, qEntry.FromUser.UserID)
fmt.Print("processing request by")
if fromUsername != "" {
fmt.Print(" from ", fromUsername, "#", qEntry.FromUser.UserID)
}
// Handle video message downloads differently
if qEntry.IsVideoMessage {
fmt.Println(": [video message]")
q.processVideoMessageEntry(ctx, qEntry)
return
}
fmt.Println(":", qEntry.URL)
qEntry.editReply(ctx, processStartStr)
downloader := Downloader{
ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) {
q.currentlyDownloadedEntry.sourceCodecInfo = "🎬 Source: " + videoCodecs
if audioCodecs == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio"
} else {
if videoCodecs != "" {
q.currentlyDownloadedEntry.sourceCodecInfo += " / "
}
q.currentlyDownloadedEntry.sourceCodecInfo += audioCodecs
}
if convertActionsNeeded == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)"
} else {
q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")"
}
qEntry.editReply(ctx, "🎬 Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
},
UpdateProgressPercentFunc: q.HandleProgressPercentUpdate,
}
// Download the file to SAVE_DIR
dlResult, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message, qEntry.Format)
if err != nil {
fmt.Println(" error downloading:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
// Probe the downloaded file to check codec compatibility
conv := Converter{
Format: qEntry.Format,
UpdateProgressPercentCallback: q.HandleProgressPercentUpdate,
}
if err := conv.ProbeFile(dlResult.FilePath); err != nil {
fmt.Println(" error probing file:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
// Update codec info in UI
if downloader.ConvertStartFunc != nil {
downloader.ConvertStartFunc(ctx, conv.VideoCodecs, conv.AudioCodecs, conv.GetActionsNeeded())
}
// Check if file is small enough to upload to Telegram (<512MB)
if dlResult.FileSize >= telegramUploadThreshold {
// File too large, only save to disk
fmt.Printf(" file too large (%s >= 512MB), skipping Telegram upload\n", humanize.Bytes(uint64(dlResult.FileSize)))
qEntry.editReply(ctx, fmt.Sprintf("✅ Saved to server\n📁 %s\n💾 Size: %s\n⚠ File too large for Telegram upload (>512MB)",
dlResult.FilePath, humanize.Bytes(uint64(dlResult.FileSize))))
qEntry.sendTypingCancelAction(ctx)
return
}
// File is small enough, process for upload
fmt.Println(" processing for upload...")
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
// Convert if needed, then upload
uploadPath := dlResult.FilePath
uploadFormat := "mkv"
if qEntry.Format == "mp3" {
uploadFormat = "mp3"
}
// For video format, determine the actual format from filename
if qEntry.Format != "mp3" {
ext := filepath.Ext(uploadPath)
if ext != "" {
uploadFormat = ext[1:] // Remove the leading dot
}
}
if conv.NeedConvert() {
// Need conversion
outputPath, outputFormat, err := conv.ConvertIfNeeded(qEntry.Ctx, dlResult.FilePath, params.SaveDir)
if err != nil {
fmt.Println(" error converting:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
uploadPath = outputPath
uploadFormat = outputFormat
// Keep both original and converted files
}
// Open file for upload
file, err := os.Open(uploadPath)
if err != nil {
fmt.Println(" error opening file:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
defer file.Close()
// Upload to Telegram with video dimensions
err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, file, uploadFormat, dlResult.Title, conv.VideoWidth, conv.VideoHeight)
if err != nil {
fmt.Println(" error uploading:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
file.Close()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
file.Close()
// Remove the uploaded file (since it's saved in SAVE_DIR, we keep it only if needed)
// Actually, we keep the file in SAVE_DIR as requested
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
if qEntry.Canceled {
fmt.Print(" canceled\n")
q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent)
} else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 {
fmt.Print(" progress: 100%\n")
q.updateProgress(ctx, qEntry, uploadDoneStr, 100)
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.sendTypingCancelAction(ctx)
}
func (q *DownloadQueue) processVideoMessageEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
videoFile := qEntry.VideoDocument
if videoFile == nil {
qEntry.editReply(ctx, errorStr+": could not get video info")
return
}
// Get file name
var fileName string
for _, attr := range videoFile.Attributes {
if docAttr, ok := attr.(*tg.DocumentAttributeFilename); ok {
fileName = docAttr.FileName
break
}
}
if fileName == "" {
fileName = fmt.Sprintf("video_%d.mp4", time.Now().Unix())
}
// Create safe filename with date prefix: YYYY-MM-DD-{timestamp}-{filename}.{ext}
now := time.Now()
dateStr := now.Format("2006-01-02")
timestamp := now.Unix()
ext := filepath.Ext(fileName)
if ext == "" {
ext = ".mp4"
}
safeFileName := fmt.Sprintf("%s-%d%s", dateStr, timestamp, ext)
filePath := filepath.Join(params.SaveDir, safeFileName)
// Check if file already exists
if _, err := os.Stat(filePath); err == nil {
for i := 1; i < 1000; i++ {
safeFileName = fmt.Sprintf("%s-%d-%d%s", dateStr, timestamp, i, ext)
filePath = filepath.Join(params.SaveDir, safeFileName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
break
}
}
}
// Update progress message
qEntry.editReply(ctx, "⬇️ Downloading video...")
// Get file size
fileSize := videoFile.Size
// Download file using Telegram client
fileLoc := &tg.InputDocumentFileLocation{
ID: videoFile.ID,
AccessHash: videoFile.AccessHash,
FileReference: videoFile.FileReference,
}
fmt.Printf(" downloading video: %s (size: %s)\n", fileName, humanize.Bytes(uint64(fileSize)))
file, err := os.Create(filePath)
if err != nil {
qEntry.editReply(ctx, errorStr+": could not create file")
return
}
// Download with progress
offset := int64(0)
chunkSize := int64(1024 * 1024) // 1MB chunks
lastPercent := 0
written := int64(0)
for offset < fileSize {
select {
case <-qEntry.Ctx.Done():
file.Close()
os.Remove(filePath)
qEntry.editReply(ctx, "❌ Canceled")
qEntry.Canceled = true
return
default:
}
if offset+chunkSize > fileSize {
chunkSize = fileSize - offset
}
// Telegram API requires limit to be divisible by 1KB
if chunkSize%1024 != 0 {
// Round down to nearest 1KB boundary
chunkSize = (chunkSize / 1024) * 1024
if chunkSize == 0 {
// If remaining data is less than 1KB, round up to 1KB
// The API will return the actual remaining bytes
chunkSize = 1024
}
}
loc := &tg.InputDocumentFileLocation{
ID: videoFile.ID,
AccessHash: videoFile.AccessHash,
FileReference: fileLoc.FileReference,
}
chunk, err := telegramClient.API().UploadGetFile(qEntry.Ctx, &tg.UploadGetFileRequest{
Location: loc,
Offset: offset,
Limit: int(chunkSize),
Precise: true,
CDNSupported: false,
})
if err != nil {
file.Close()
os.Remove(filePath)
qEntry.editReply(ctx, errorStr+": failed to download chunk: "+err.Error())
return
}
chunkData, ok := chunk.(*tg.UploadFile)
if !ok {
file.Close()
os.Remove(filePath)
qEntry.editReply(ctx, errorStr+": unexpected response type")
return
}
n, err := file.Write(chunkData.Bytes)
if err != nil {
file.Close()
os.Remove(filePath)
qEntry.editReply(ctx, errorStr+": failed to write to file")
return
}
offset += int64(n)
written += int64(n)
// Update progress
if fileSize > 0 {
percent := int(float64(written) * 100 / float64(fileSize))
if percent != lastPercent && percent%10 == 0 {
lastPercent = percent
progressBar := getProgressbar(percent, progressBarLength)
qEntry.editReply(ctx, "⬇️ Downloading video...\n"+progressBar)
}
}
}
file.Close()
// Send success message
savedMsg := fmt.Sprintf("✅ Video saved\n📁 %s\n💾 Size: %s", safeFileName, humanize.Bytes(uint64(written)))
qEntry.editReply(ctx, savedMsg)
fmt.Printf(" video saved to: %s\n", filePath)
}
func (q *DownloadQueue) processor() {
for {
q.mutex.Lock()
if (len(q.entries)) == 0 {
q.mutex.Unlock()
<-q.processReqChan
continue
}
// Updating queue positions for all waiting entries.
for i := 1; i < len(q.entries); i++ {
q.entries[i].editReply(q.ctx, q.getQueuePositionString(i))
q.entries[i].sendTypingCancelAction(q.ctx)
}
q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout)
qEntry := &q.entries[0]
q.mutex.Unlock()
q.currentlyDownloadedEntry = currentlyDownloadedEntryType{}
q.processQueueEntry(q.ctx, qEntry)
q.mutex.Lock()
q.entries[0].CtxCancel()
q.entries = q.entries[1:]
if len(q.entries) == 0 {
fmt.Print("finished queue processing\n")
}
q.mutex.Unlock()
}
}
func (q *DownloadQueue) Init(ctx context.Context) {
q.ctx = ctx
q.processReqChan = make(chan bool)
go q.processor()
}