diff --git a/README.md b/README.md index a6e6643..c0d6445 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,9 @@ processed at a time. The bot uses the [Telegram MTProto API](https://github.com/gotd/td), which supports larger video uploads than the default 50MB with the standard -Telegram bot API. Videos are not saved on disk, they are simultaneously -uploaded from the source to Telegram. Incompatible video and audio streams -are automatically converted to match those which are supported by Telegram's -built-in video player. +Telegram bot API. Videos are not saved on disk. Incompatible video and audio +streams are automatically converted to match those which are supported by +Telegram's built-in video player. The only dependencies are [yt-dlp](https://github.com/yt-dlp/yt-dlp) and [ffmpeg](https://github.com/FFmpeg/FFmpeg). Tested on Linux, but should be diff --git a/convert.go b/convert.go index 8d0a3c3..8491a48 100644 --- a/convert.go +++ b/convert.go @@ -158,11 +158,11 @@ func (c *Converter) ffmpegProgressSock() (sockFilename string, sock net.Listener if len(a) > 0 && len(a[len(a)-1]) > 0 { data = "" l, _ := strconv.Atoi(a[len(a)-1][len(a[len(a)-1])-1]) - c.UpdateProgressPercentCallback(int(100 * float64(l) / c.Duration / 1000000)) + c.UpdateProgressPercentCallback(processStr, int(100*float64(l)/c.Duration/1000000)) } if strings.Contains(data, "progress=end") { - c.UpdateProgressPercentCallback(100) + c.UpdateProgressPercentCallback(processStr, 100) } } }() @@ -217,7 +217,7 @@ func (c *Converter) ConvertIfNeeded(ctx context.Context, rr *ReReadCloser) (io.R ff = ff.GlobalArgs("-progress", "unix:"+progressSockFilename) } } else { - c.UpdateProgressPercentCallback(-1) + c.UpdateProgressPercentCallback(processStr, -1) } } diff --git a/dl.go b/dl.go index 9b9ee3c..f29d841 100644 --- a/dl.go +++ b/dl.go @@ -13,7 +13,7 @@ const downloadAndConvertTimeout = 5 * time.Minute type ProbeStartCallbackFunc func(ctx context.Context) type ConvertStartCallbackFunc func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) -type UpdateProgressPercentCallbackFunc func(progressPercent int) +type UpdateProgressPercentCallbackFunc func(progressStr string, progressPercent int) type Downloader struct { ProbeStartFunc ProbeStartCallbackFunc diff --git a/go.mod b/go.mod index 3165cd6..2d7bb86 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 replace github.com/wader/goutubedl => github.com/nonoo/goutubedl v0.0.0-20230814114826-c1dcced79138 require ( + github.com/dustin/go-humanize v1.0.1 github.com/google/go-github/v53 v53.2.0 github.com/gotd/td v0.84.0 github.com/u2takey/ffmpeg-go v0.5.0 diff --git a/go.sum b/go.sum index 214148d..98917f3 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/main.go b/main.go index ec863aa..52188f6 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "io" "net" "net/url" "os" @@ -22,23 +21,6 @@ var dlQueue DownloadQueue var telegramUploader *uploader.Uploader var telegramSender *message.Sender -func uploadFile(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, f io.ReadCloser) error { - upload, err := telegramUploader.FromReader(ctx, "yt-dlp", f) - if err != nil { - return fmt.Errorf("uploading %w", err) - } - - // Now we have uploaded file handle, sending it as styled message. First, preparing message. - document := message.UploadedDocument(upload).Video() - - // Sending message with media. - if _, err := telegramSender.Answer(entities, u).Media(ctx, document); err != nil { - return fmt.Errorf("send: %w", err) - } - - return nil -} - func handleCmdDLP(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, msg *tg.Message) { // Check if message is an URL. validURI := true @@ -155,7 +137,7 @@ func main() { api := client.API() - telegramUploader = uploader.NewUploader(api) + telegramUploader = uploader.NewUploader(api).WithProgress(dlUploader) telegramSender = message.NewSender(api).WithUploader(telegramUploader) dlQueue.Init(ctx) diff --git a/queue.go b/queue.go index 9b3b97d..c20aeb8 100644 --- a/queue.go +++ b/queue.go @@ -12,7 +12,8 @@ import ( const processStartStr = "🔍 Getting information..." const processStr = "🔨 Processing" -const processDoneStr = "🏁 Processing" +const uploadStr = "☁️ Uploading" +const uploadDoneStr = "🏁 Uploading" const errorStr = "❌ Error" const canceledStr = "❌ Canceled" @@ -60,10 +61,26 @@ func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) { e.sendTypingAction(ctx) } +type currentlyDownloadedEntryType struct { + disableProgressPercentUpdate bool + progressPercentUpdateMutex sync.Mutex + lastProgressPercentUpdateAt time.Time + lastProgressPercent int + lastDisplayedProgressPercent int + progressUpdateTimer *time.Timer + + sourceCodecInfo string + progressInfo string +} + type DownloadQueue struct { + ctx context.Context + mutex sync.Mutex entries []DownloadQueueEntry processReqChan chan bool + + currentlyDownloadedEntry currentlyDownloadedEntryType } func (e *DownloadQueue) getQueuePositionString(pos int) string { @@ -115,17 +132,56 @@ func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Enti q.mutex.Unlock() } -func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressPercent int, sourceCodecInfo string) { +func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) { if progressPercent < 0 { - qEntry.editReply(ctx, processStr+"... (no progress available)\n"+sourceCodecInfo) + qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo) return } if progressPercent == 0 { - qEntry.editReply(ctx, processStr+"...\n"+sourceCodecInfo) + qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo) return } fmt.Print(" progress: ", progressPercent, "%\n") - qEntry.editReply(ctx, processStr+": "+getProgressbar(progressPercent, progressBarLength)+"\n"+sourceCodecInfo) + qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo) + q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent +} + +func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) { + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() + + if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent { + return + } + q.currentlyDownloadedEntry.lastProgressPercent = progressPercent + if progressPercent < 0 { + q.currentlyDownloadedEntry.disableProgressPercentUpdate = true + q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) + return + } + + if q.currentlyDownloadedEntry.progressUpdateTimer != nil { + q.currentlyDownloadedEntry.progressUpdateTimer.Stop() + select { + case <-q.currentlyDownloadedEntry.progressUpdateTimer.C: + default: + } + } + + timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt) + if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval { + q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() { + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + if !q.currentlyDownloadedEntry.disableProgressPercentUpdate { + q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) + q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now() + } + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() + }) + return + } + q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) + q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now() } func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) { @@ -138,109 +194,71 @@ func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQ qEntry.editReply(ctx, processStartStr) - var disableProgressPercentUpdate bool - var progressPercentUpdateMutex sync.Mutex - var lastProgressPercentUpdateAt time.Time - var lastProgressPercent int - var progressUpdateTimer *time.Timer - var sourceCodecInfo string downloader := Downloader{ ProbeStartFunc: func(ctx context.Context) { qEntry.editReply(ctx, "🎬 Getting video format...") }, ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) { - sourceCodecInfo = "🎬 Source: " + videoCodecs + q.currentlyDownloadedEntry.sourceCodecInfo = "🎬 Source: " + videoCodecs if audioCodecs == "" { - sourceCodecInfo += ", no audio" + q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio" } else { - sourceCodecInfo += " / " + audioCodecs + q.currentlyDownloadedEntry.sourceCodecInfo += " / " + audioCodecs } if convertActionsNeeded == "" { - sourceCodecInfo += " (no conversion needed)" + q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)" } else { - sourceCodecInfo += " (converting: " + convertActionsNeeded + ")" + q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")" } - qEntry.editReply(ctx, "🎬 Preparing download...\n"+sourceCodecInfo) - }, - UpdateProgressPercentFunc: func(progressPercent int) { - progressPercentUpdateMutex.Lock() - defer progressPercentUpdateMutex.Unlock() - - if disableProgressPercentUpdate || lastProgressPercent == progressPercent { - return - } - lastProgressPercent = progressPercent - if progressPercent < 0 { - disableProgressPercentUpdate = true - q.updateProgress(ctx, qEntry, progressPercent, sourceCodecInfo) - return - } - - if progressUpdateTimer != nil { - progressUpdateTimer.Stop() - select { - case <-progressUpdateTimer.C: - default: - } - } - - timeElapsedSinceLastUpdate := time.Since(lastProgressPercentUpdateAt) - if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval { - progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() { - q.updateProgress(ctx, qEntry, progressPercent, sourceCodecInfo) - lastProgressPercentUpdateAt = time.Now() - }) - return - } - q.updateProgress(ctx, qEntry, progressPercent, sourceCodecInfo) - lastProgressPercentUpdateAt = time.Now() + qEntry.editReply(ctx, "🎬 Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo) }, + UpdateProgressPercentFunc: q.HandleProgressPercentUpdate, } r, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message) if err != nil { fmt.Println(" error downloading:", err) - progressPercentUpdateMutex.Lock() - disableProgressPercentUpdate = true - progressPercentUpdateMutex.Unlock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + q.currentlyDownloadedEntry.disableProgressPercentUpdate = true + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } // Feeding the returned io.ReadCloser to the uploader. fmt.Println(" processing...") - progressPercentUpdateMutex.Lock() - q.updateProgress(ctx, qEntry, lastProgressPercent, sourceCodecInfo) - progressPercentUpdateMutex.Unlock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent) + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() - err = uploadFile(ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, r) + err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, r) if err != nil { fmt.Println(" error processing:", err) - progressPercentUpdateMutex.Lock() - disableProgressPercentUpdate = true - progressPercentUpdateMutex.Unlock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + q.currentlyDownloadedEntry.disableProgressPercentUpdate = true + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() r.Close() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } - progressPercentUpdateMutex.Lock() - disableProgressPercentUpdate = true - progressPercentUpdateMutex.Unlock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() + q.currentlyDownloadedEntry.disableProgressPercentUpdate = true + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() r.Close() - progressPercentUpdateMutex.Lock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() if qEntry.Canceled { fmt.Print(" canceled\n") - qEntry.editReply(ctx, canceledStr+": "+getProgressbar(lastProgressPercent, progressBarLength)+"\n"+sourceCodecInfo) - } else if lastProgressPercent < 100 { + q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent) + } else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 { fmt.Print(" progress: 100%\n") - qEntry.editReply(ctx, processDoneStr+": "+getProgressbar(100, progressBarLength)+"\n"+sourceCodecInfo) + q.updateProgress(ctx, qEntry, uploadDoneStr, 100) } - progressPercentUpdateMutex.Unlock() + q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.sendTypingCancelAction(ctx) } -func (q *DownloadQueue) processor(ctx context.Context) { +func (q *DownloadQueue) processor() { for { q.mutex.Lock() if (len(q.entries)) == 0 { @@ -251,16 +269,18 @@ func (q *DownloadQueue) processor(ctx context.Context) { // Updating queue positions for all waiting entries. for i := 1; i < len(q.entries); i++ { - q.entries[i].editReply(ctx, q.getQueuePositionString(i)) - q.entries[i].sendTypingCancelAction(ctx) + q.entries[i].editReply(q.ctx, q.getQueuePositionString(i)) + q.entries[i].sendTypingCancelAction(q.ctx) } - q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(ctx, downloadAndConvertTimeout) + q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout) qEntry := &q.entries[0] q.mutex.Unlock() - q.processQueueEntry(ctx, qEntry) + q.currentlyDownloadedEntry = currentlyDownloadedEntryType{} + + q.processQueueEntry(q.ctx, qEntry) q.mutex.Lock() q.entries[0].CtxCancel() @@ -273,6 +293,7 @@ func (q *DownloadQueue) processor(ctx context.Context) { } func (q *DownloadQueue) Init(ctx context.Context) { + q.ctx = ctx q.processReqChan = make(chan bool) - go q.processor(ctx) + go q.processor() } diff --git a/upload.go b/upload.go new file mode 100644 index 0000000..9b028d4 --- /dev/null +++ b/upload.go @@ -0,0 +1,57 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "math/big" + + "github.com/dustin/go-humanize" + "github.com/gotd/td/telegram/message" + "github.com/gotd/td/telegram/uploader" + "github.com/gotd/td/tg" +) + +type Uploader struct{} + +var dlUploader Uploader + +func (p Uploader) Chunk(ctx context.Context, state uploader.ProgressState) error { + dlQueue.HandleProgressPercentUpdate(uploadStr, int(state.Uploaded*100/state.Total)) + return nil +} + +func (p *Uploader) UploadFile(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, f io.ReadCloser) error { + // Reading to a buffer first, because we don't know the file size. + var buf bytes.Buffer + for { + b := make([]byte, 1024) + n, err := f.Read(b) + if err != nil && err != io.EOF { + return fmt.Errorf("reading to buffer error: %w", err) + } + if n == 0 { + break + } + buf.Write(b[:n]) + } + + fmt.Println(" got", buf.Len(), "bytes, uploading...") + dlQueue.currentlyDownloadedEntry.progressInfo = fmt.Sprint(" (", humanize.BigBytes(big.NewInt(int64(buf.Len()))), ")") + + upload, err := telegramUploader.FromBytes(ctx, "yt-dlp", buf.Bytes()) + if err != nil { + return fmt.Errorf("uploading %w", err) + } + + // Now we have uploaded file handle, sending it as styled message. First, preparing message. + document := message.UploadedDocument(upload).Video() + + // Sending message with media. + if _, err := telegramSender.Answer(entities, u).Media(ctx, document); err != nil { + return fmt.Errorf("send: %w", err) + } + + return nil +}