yt-dlp-telegram-bot/queue.go
2023-08-15 09:40:25 +02:00

300 lines
9.5 KiB
Go

package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/tg"
)
const processStartStr = "🔍 Getting information..."
const processStr = "🔨 Processing"
const uploadStr = "☁️ Uploading"
const uploadDoneStr = "🏁 Uploading"
const errorStr = "❌ Error"
const canceledStr = "❌ Canceled"
const maxProgressPercentUpdateInterval = time.Second
const progressBarLength = 10
type DownloadQueueEntry struct {
URL string
OrigEntities tg.Entities
OrigMsgUpdate *tg.UpdateNewMessage
OrigMsg *tg.Message
FromUser *tg.PeerUser
FromGroup *tg.PeerChat
Reply *message.Builder
ReplyMsg *tg.UpdateShortSentMessage
Ctx context.Context
CtxCancel context.CancelFunc
Canceled bool
}
// func (e *DownloadQueueEntry) getTypingActionDst() tg.InputPeerClass {
// if e.FromGroup != nil {
// return &tg.InputPeerChat{
// ChatID: e.FromGroup.ChatID,
// }
// }
// return &tg.InputPeerUser{
// UserID: e.FromUser.UserID,
// }
// }
func (e *DownloadQueueEntry) sendTypingAction(ctx context.Context) {
// _ = telegramSender.To(e.getTypingActionDst()).TypingAction().Typing(ctx)
}
func (e *DownloadQueueEntry) sendTypingCancelAction(ctx context.Context) {
// _ = telegramSender.To(e.getTypingActionDst()).TypingAction().Cancel(ctx)
}
func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) {
_, _ = e.Reply.Edit(e.ReplyMsg.ID).Text(ctx, s)
e.sendTypingAction(ctx)
}
type currentlyDownloadedEntryType struct {
disableProgressPercentUpdate bool
progressPercentUpdateMutex sync.Mutex
lastProgressPercentUpdateAt time.Time
lastProgressPercent int
lastDisplayedProgressPercent int
progressUpdateTimer *time.Timer
sourceCodecInfo string
progressInfo string
}
type DownloadQueue struct {
ctx context.Context
mutex sync.Mutex
entries []DownloadQueueEntry
processReqChan chan bool
currentlyDownloadedEntry currentlyDownloadedEntryType
}
func (e *DownloadQueue) getQueuePositionString(pos int) string {
return "👨‍👦‍👦 Request queued at position #" + fmt.Sprint(pos)
}
func (q *DownloadQueue) Add(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) {
q.mutex.Lock()
var replyStr string
if len(q.entries) == 0 {
replyStr = processStartStr
} else {
fmt.Println(" queueing request at position #", len(q.entries))
replyStr = q.getQueuePositionString(len(q.entries))
}
newEntry := DownloadQueueEntry{
URL: url,
OrigEntities: entities,
OrigMsgUpdate: u,
OrigMsg: u.Message.(*tg.Message),
}
newEntry.Reply = telegramSender.Reply(entities, u)
replyText, _ := newEntry.Reply.Text(ctx, replyStr)
newEntry.ReplyMsg = replyText.(*tg.UpdateShortSentMessage)
newEntry.FromUser, newEntry.FromGroup = resolveMsgSrc(newEntry.OrigMsg)
q.entries = append(q.entries, newEntry)
q.mutex.Unlock()
select {
case q.processReqChan <- true:
default:
}
}
func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) {
q.mutex.Lock()
if len(q.entries) > 0 {
q.entries[0].Canceled = true
q.entries[0].CtxCancel()
} else {
fmt.Println(" no active request to cancel")
_, _ = telegramSender.Reply(entities, u).Text(ctx, errorStr+": no active request to cancel")
}
q.mutex.Unlock()
}
func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) {
if progressPercent < 0 {
qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
if progressPercent == 0 {
qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
return
}
fmt.Print(" progress: ", progressPercent, "%\n")
qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent
}
func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent {
return
}
q.currentlyDownloadedEntry.lastProgressPercent = progressPercent
if progressPercent < 0 {
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
return
}
if q.currentlyDownloadedEntry.progressUpdateTimer != nil {
q.currentlyDownloadedEntry.progressUpdateTimer.Stop()
select {
case <-q.currentlyDownloadedEntry.progressUpdateTimer.C:
default:
}
}
timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt)
if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval {
q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() {
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
if !q.currentlyDownloadedEntry.disableProgressPercentUpdate {
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
})
return
}
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
}
func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
fromUsername := getFromUsername(qEntry.OrigEntities, qEntry.FromUser.UserID)
fmt.Print("processing request by")
if fromUsername != "" {
fmt.Print(" from ", fromUsername, "#", qEntry.FromUser.UserID)
}
fmt.Println(":", qEntry.URL)
qEntry.editReply(ctx, processStartStr)
downloader := Downloader{
ProbeStartFunc: func(ctx context.Context) {
qEntry.editReply(ctx, "🎬 Getting video format...")
},
ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) {
q.currentlyDownloadedEntry.sourceCodecInfo = "🎬 Source: " + videoCodecs
if audioCodecs == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio"
} else {
q.currentlyDownloadedEntry.sourceCodecInfo += " / " + audioCodecs
}
if convertActionsNeeded == "" {
q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)"
} else {
q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")"
}
qEntry.editReply(ctx, "🎬 Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
},
UpdateProgressPercentFunc: q.HandleProgressPercentUpdate,
}
r, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message)
if err != nil {
fmt.Println(" error downloading:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
// Feeding the returned io.ReadCloser to the uploader.
fmt.Println(" processing...")
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, r)
if err != nil {
fmt.Println(" error processing:", err)
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
r.Close()
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
return
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
r.Close()
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
if qEntry.Canceled {
fmt.Print(" canceled\n")
q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent)
} else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 {
fmt.Print(" progress: 100%\n")
q.updateProgress(ctx, qEntry, uploadDoneStr, 100)
}
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
qEntry.sendTypingCancelAction(ctx)
}
func (q *DownloadQueue) processor() {
for {
q.mutex.Lock()
if (len(q.entries)) == 0 {
q.mutex.Unlock()
<-q.processReqChan
continue
}
// Updating queue positions for all waiting entries.
for i := 1; i < len(q.entries); i++ {
q.entries[i].editReply(q.ctx, q.getQueuePositionString(i))
q.entries[i].sendTypingCancelAction(q.ctx)
}
q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout)
qEntry := &q.entries[0]
q.mutex.Unlock()
q.currentlyDownloadedEntry = currentlyDownloadedEntryType{}
q.processQueueEntry(q.ctx, qEntry)
q.mutex.Lock()
q.entries[0].CtxCancel()
q.entries = q.entries[1:]
if len(q.entries) == 0 {
fmt.Print("finished queue processing\n")
}
q.mutex.Unlock()
}
}
func (q *DownloadQueue) Init(ctx context.Context) {
q.ctx = ctx
q.processReqChan = make(chan bool)
go q.processor()
}