yt-dlp-telegram-bot/queue.go
2026-03-02 13:11:38 +08:00

370 lines
12 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/tg"
)
const processStartStr = "🔍 Getting information..."
const processStr = "🔨 Processing"
const uploadStr = "☁️ Uploading"
const uploadDoneStr = "🏁 Uploading"
const errorStr = "❌ Error"
const canceledStr = "❌ Canceled"
const maxProgressPercentUpdateInterval = time.Second
const progressBarLength = 10
type DownloadQueueEntry struct {
URL string
Format string
OrigEntities tg.Entities
OrigMsgUpdate *tg.UpdateNewMessage
OrigMsg *tg.Message
FromUser *tg.PeerUser
FromGroup *tg.PeerChat
Reply *message.Builder
ReplyMsg *tg.UpdateShortSentMessage
Ctx context.Context
CtxCancel context.CancelFunc
Canceled bool
}
func (e *DownloadQueueEntry) sendTypingAction(ctx context.Context) {
}
func (e *DownloadQueueEntry) sendTypingCancelAction(ctx context.Context) {
}
func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) {
_, _ = e.Reply.Edit(e.ReplyMsg.ID).Text(ctx, s)
e.sendTypingAction(ctx)
}
type currentlyDownloadedEntryType struct {
disableProgressPercentUpdate bool
progressPercentUpdateMutex sync.Mutex
lastProgressPercentUpdateAt time.Time
lastProgressPercent int
lastDisplayedProgressPercent int
progressUpdateTimer *time.Timer
sourceCodecInfo string
progressInfo string
}
type DownloadQueue struct {
ctx context.Context
mutex sync.Mutex
entries []DownloadQueueEntry
processReqChan chan bool
currentlyDownloadedEntry currentlyDownloadedEntryType
}
func (e *DownloadQueue) getQueuePositionString(pos int) string {
return "👨‍👦‍👦 Request queued at position #" + fmt.Sprint(pos)
}
func (q *DownloadQueue) Add(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string) {
q.mutex.Lock()
var replyStr string
if len(q.entries) == 0 {
replyStr = processStartStr
} else {
fmt.Println(" queueing request at position #", len(q.entries))
replyStr = q.getQueuePositionString(len(q.entries))
}
newEntry := DownloadQueueEntry{
URL: url,
Format: format,
OrigEntities: entities,
OrigMsgUpdate: u,
OrigMsg: u.Message.(*tg.Message),
}
newEntry.Reply = telegramSender.Reply(entities, u)
replyText, _ := newEntry.Reply.Text(ctx, replyStr)
newEntry.ReplyMsg = replyText.(*tg.UpdateShortSentMessage)
newEntry.FromUser, newEntry.FromGroup = resolveMsgSrc(newEntry.OrigMsg)
q.entries = append(q.entries, newEntry)
q.mutex.Unlock()
select {
case q.processReqChan <- true:
default:
}
}
func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) {
q.mutex.Lock()
if len(q.entries) > 0 {
q.entries[0].Canceled = true
q.entries[0].CtxCancel()
} else {
fmt.Println(" no active request to cancel")
_, _ = telegramSender.Reply(entities, u).Text(ctx, errorStr+": no active request to cancel")
}
q.mutex.Unlock()
}
func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) {
if progressPercent < 0 {
qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
if progressPercent == 0 {
qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
fmt.Print(" progress: ", progressPercent, "%\n")
qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent
}
func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent {
return
}
q.currentlyDownloadedEntry.lastProgressPercent = progressPercent
if progressPercent < 0 {
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
return
}
if q.currentlyDownloadedEntry.progressUpdateTimer != nil {
q.currentlyDownloadedEntry.progressUpdateTimer.Stop()
select {
case <-q.currentlyDownloadedEntry.progressUpdateTimer.C:
default:
}
}
timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt)
if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval {
q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
if !q.currentlyDownloadedEntry.disableProgressPercentUpdate {
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
})
return
}
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
fromUsername := getFromUsername(qEntry.OrigEntities, qEntry.FromUser.UserID)
fmt.Print("processing request by")
if fromUsername != "" {
fmt.Print(" from ", fromUsername, "#", qEntry.FromUser.UserID)
}
fmt.Println(":", qEntry.URL)
qEntry.editReply(ctx, processStartStr)
downloader := Downloader{
ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) {
q.currentlyDownloadedEntry.sourceCodecInfo = "🎬 Source: " + videoCodecs
if audioCodecs == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio"
} else {
if videoCodecs != "" {
q.currentlyDownloadedEntry.sourceCodecInfo += " / "
}
q.currentlyDownloadedEntry.sourceCodecInfo += audioCodecs
}
if convertActionsNeeded == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)"
} else {
q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")"
}
qEntry.editReply(ctx, "🎬 Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
},
UpdateProgressPercentFunc: q.HandleProgressPercentUpdate,
}
// Download the file to SAVE_DIR
dlResult, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message, qEntry.Format)
if err != nil {
fmt.Println(" error downloading:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
fmt.Printf(" saved to %s (size: %s)\n", dlResult.FilePath, humanize.Bytes(uint64(dlResult.FileSize)))
// Probe the downloaded file to check codec compatibility
conv := Converter{
Format: qEntry.Format,
UpdateProgressPercentCallback: q.HandleProgressPercentUpdate,
}
if err := conv.ProbeFile(dlResult.FilePath); err != nil {
fmt.Println(" error probing file:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
// Update codec info in UI
if downloader.ConvertStartFunc != nil {
downloader.ConvertStartFunc(ctx, conv.VideoCodecs, conv.AudioCodecs, conv.GetActionsNeeded())
}
// Check if file is small enough to upload to Telegram (<512MB)
if dlResult.FileSize >= telegramUploadThreshold {
// File too large, only save to disk
fmt.Printf(" file too large (%s >= 512MB), skipping Telegram upload\n", humanize.Bytes(uint64(dlResult.FileSize)))
qEntry.editReply(ctx, fmt.Sprintf("✅ Saved to server\n📁 %s\n💾 Size: %s\n⚠ File too large for Telegram upload (>512MB)",
dlResult.FilePath, humanize.Bytes(uint64(dlResult.FileSize))))
qEntry.sendTypingCancelAction(ctx)
return
}
// File is small enough, process for upload
fmt.Println(" processing for upload...")
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
// Convert if needed, then upload
uploadPath := dlResult.FilePath
uploadFormat := "mkv"
if qEntry.Format == "mp3" {
uploadFormat = "mp3"
}
// For video format, determine the actual format from filename
if qEntry.Format != "mp3" {
ext := filepath.Ext(uploadPath)
if ext != "" {
uploadFormat = ext[1:] // Remove the leading dot
}
}
if conv.NeedConvert() {
// Need conversion
outputPath, outputFormat, err := conv.ConvertIfNeeded(qEntry.Ctx, dlResult.FilePath, params.SaveDir)
if err != nil {
fmt.Println(" error converting:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
uploadPath = outputPath
uploadFormat = outputFormat
// Keep both original and converted files
}
// Open file for upload
file, err := os.Open(uploadPath)
if err != nil {
fmt.Println(" error opening file:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
defer file.Close()
// Upload to Telegram with video dimensions
err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, file, uploadFormat, dlResult.Title, conv.VideoWidth, conv.VideoHeight)
if err != nil {
fmt.Println(" error uploading:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
file.Close()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
file.Close()
// Remove the uploaded file (since it's saved in SAVE_DIR, we keep it only if needed)
// Actually, we keep the file in SAVE_DIR as requested
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
if qEntry.Canceled {
fmt.Print(" canceled\n")
q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent)
} else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 {
fmt.Print(" progress: 100%\n")
q.updateProgress(ctx, qEntry, uploadDoneStr, 100)
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.sendTypingCancelAction(ctx)
}
func (q *DownloadQueue) processor() {
for {
q.mutex.Lock()
if (len(q.entries)) == 0 {
q.mutex.Unlock()
<-q.processReqChan
continue
}
// Updating queue positions for all waiting entries.
for i := 1; i < len(q.entries); i++ {
q.entries[i].editReply(q.ctx, q.getQueuePositionString(i))
q.entries[i].sendTypingCancelAction(q.ctx)
}
q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout)
qEntry := &q.entries[0]
q.mutex.Unlock()
q.currentlyDownloadedEntry = currentlyDownloadedEntryType{}
q.processQueueEntry(q.ctx, qEntry)
q.mutex.Lock()
q.entries[0].CtxCancel()
q.entries = q.entries[1:]
if len(q.entries) == 0 {
fmt.Print("finished queue processing\n")
}
q.mutex.Unlock()
}
}
func (q *DownloadQueue) Init(ctx context.Context) {
q.ctx = ctx
q.processReqChan = make(chan bool)
go q.processor()
}