546 lines
17 KiB
Go
546 lines
17 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/dustin/go-humanize"
|
||
"github.com/gotd/td/telegram/message"
|
||
"github.com/gotd/td/tg"
|
||
)
|
||
|
||
const processStartStr = "🔍 Getting information..."
|
||
const processStr = "🔨 Processing"
|
||
const uploadStr = "☁️ Uploading"
|
||
const uploadDoneStr = "🏁 Uploading"
|
||
const errorStr = "❌ Error"
|
||
const canceledStr = "❌ Canceled"
|
||
|
||
const maxProgressPercentUpdateInterval = time.Second
|
||
const progressBarLength = 10
|
||
|
||
type DownloadQueueEntry struct {
|
||
URL string
|
||
Format string
|
||
|
||
// IsVideoMessage is true if this entry is for a video message download
|
||
IsVideoMessage bool
|
||
// VideoDocument stores the video document info for video messages
|
||
VideoDocument *tg.Document
|
||
|
||
OrigEntities tg.Entities
|
||
OrigMsgUpdate *tg.UpdateNewMessage
|
||
OrigMsg *tg.Message
|
||
FromUser *tg.PeerUser
|
||
FromGroup *tg.PeerChat
|
||
|
||
Reply *message.Builder
|
||
ReplyMsg *tg.UpdateShortSentMessage
|
||
|
||
Ctx context.Context
|
||
CtxCancel context.CancelFunc
|
||
Canceled bool
|
||
}
|
||
|
||
func (e *DownloadQueueEntry) sendTypingAction(ctx context.Context) {
|
||
}
|
||
|
||
func (e *DownloadQueueEntry) sendTypingCancelAction(ctx context.Context) {
|
||
}
|
||
|
||
func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) {
|
||
_, _ = e.Reply.Edit(e.ReplyMsg.ID).Text(ctx, s)
|
||
e.sendTypingAction(ctx)
|
||
}
|
||
|
||
type currentlyDownloadedEntryType struct {
|
||
disableProgressPercentUpdate bool
|
||
progressPercentUpdateMutex sync.Mutex
|
||
lastProgressPercentUpdateAt time.Time
|
||
lastProgressPercent int
|
||
lastDisplayedProgressPercent int
|
||
progressUpdateTimer *time.Timer
|
||
|
||
sourceCodecInfo string
|
||
progressInfo string
|
||
}
|
||
|
||
type DownloadQueue struct {
|
||
ctx context.Context
|
||
|
||
mutex sync.Mutex
|
||
entries []DownloadQueueEntry
|
||
processReqChan chan bool
|
||
|
||
currentlyDownloadedEntry currentlyDownloadedEntryType
|
||
}
|
||
|
||
func (e *DownloadQueue) getQueuePositionString(pos int) string {
|
||
return "👨👦👦 Request queued at position #" + fmt.Sprint(pos)
|
||
}
|
||
|
||
func (q *DownloadQueue) Add(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string) {
|
||
q.addEntry(ctx, entities, u, url, format, false, nil)
|
||
}
|
||
|
||
// AddVideo adds a video message download to the queue
|
||
func (q *DownloadQueue) AddVideo(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, videoDocument *tg.Document) {
|
||
q.addEntry(ctx, entities, u, "", "", true, videoDocument)
|
||
}
|
||
|
||
func (q *DownloadQueue) addEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string, isVideo bool, videoDocument *tg.Document) {
|
||
q.mutex.Lock()
|
||
|
||
var replyStr string
|
||
if len(q.entries) == 0 {
|
||
if isVideo {
|
||
replyStr = "⬇️ Downloading video..."
|
||
} else {
|
||
replyStr = processStartStr
|
||
}
|
||
} else {
|
||
fmt.Println(" queueing request at position #", len(q.entries))
|
||
replyStr = q.getQueuePositionString(len(q.entries))
|
||
}
|
||
|
||
newEntry := DownloadQueueEntry{
|
||
URL: url,
|
||
Format: format,
|
||
IsVideoMessage: isVideo,
|
||
VideoDocument: videoDocument,
|
||
OrigEntities: entities,
|
||
OrigMsgUpdate: u,
|
||
OrigMsg: u.Message.(*tg.Message),
|
||
}
|
||
|
||
newEntry.Reply = telegramSender.Reply(entities, u)
|
||
replyText, _ := newEntry.Reply.Text(ctx, replyStr)
|
||
newEntry.ReplyMsg = replyText.(*tg.UpdateShortSentMessage)
|
||
|
||
newEntry.FromUser, newEntry.FromGroup = resolveMsgSrc(newEntry.OrigMsg)
|
||
|
||
q.entries = append(q.entries, newEntry)
|
||
q.mutex.Unlock()
|
||
|
||
select {
|
||
case q.processReqChan <- true:
|
||
default:
|
||
}
|
||
}
|
||
|
||
func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) {
|
||
q.mutex.Lock()
|
||
if len(q.entries) > 0 {
|
||
q.entries[0].Canceled = true
|
||
q.entries[0].CtxCancel()
|
||
} else {
|
||
fmt.Println(" no active request to cancel")
|
||
_, _ = telegramSender.Reply(entities, u).Text(ctx, errorStr+": no active request to cancel")
|
||
}
|
||
q.mutex.Unlock()
|
||
}
|
||
|
||
func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) {
|
||
if progressPercent < 0 {
|
||
qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
|
||
return
|
||
}
|
||
if progressPercent == 0 {
|
||
qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
|
||
return
|
||
}
|
||
fmt.Print(" progress: ", progressPercent, "%\n")
|
||
qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
|
||
q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent
|
||
}
|
||
|
||
func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) {
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
|
||
if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent {
|
||
return
|
||
}
|
||
q.currentlyDownloadedEntry.lastProgressPercent = progressPercent
|
||
if progressPercent < 0 {
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
|
||
return
|
||
}
|
||
|
||
if q.currentlyDownloadedEntry.progressUpdateTimer != nil {
|
||
q.currentlyDownloadedEntry.progressUpdateTimer.Stop()
|
||
select {
|
||
case <-q.currentlyDownloadedEntry.progressUpdateTimer.C:
|
||
default:
|
||
}
|
||
}
|
||
|
||
timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt)
|
||
if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval {
|
||
q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() {
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
if !q.currentlyDownloadedEntry.disableProgressPercentUpdate {
|
||
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
|
||
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
|
||
}
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
})
|
||
return
|
||
}
|
||
q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent)
|
||
q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now()
|
||
}
|
||
|
||
func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
|
||
fromUsername := getFromUsername(qEntry.OrigEntities, qEntry.FromUser.UserID)
|
||
fmt.Print("processing request by")
|
||
if fromUsername != "" {
|
||
fmt.Print(" from ", fromUsername, "#", qEntry.FromUser.UserID)
|
||
}
|
||
|
||
// Handle video message downloads differently
|
||
if qEntry.IsVideoMessage {
|
||
fmt.Println(": [video message]")
|
||
q.processVideoMessageEntry(ctx, qEntry)
|
||
return
|
||
}
|
||
|
||
fmt.Println(":", qEntry.URL)
|
||
qEntry.editReply(ctx, processStartStr)
|
||
|
||
downloader := Downloader{
|
||
ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) {
|
||
q.currentlyDownloadedEntry.sourceCodecInfo = "🎬 Source: " + videoCodecs
|
||
if audioCodecs == "" {
|
||
q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio"
|
||
} else {
|
||
if videoCodecs != "" {
|
||
q.currentlyDownloadedEntry.sourceCodecInfo += " / "
|
||
}
|
||
q.currentlyDownloadedEntry.sourceCodecInfo += audioCodecs
|
||
}
|
||
if convertActionsNeeded == "" {
|
||
q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)"
|
||
} else {
|
||
q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")"
|
||
}
|
||
qEntry.editReply(ctx, "🎬 Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo)
|
||
},
|
||
UpdateProgressPercentFunc: q.HandleProgressPercentUpdate,
|
||
}
|
||
|
||
// Download the file to SAVE_DIR
|
||
dlResult, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message, qEntry.Format)
|
||
if err != nil {
|
||
fmt.Println(" error downloading:", err)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
|
||
return
|
||
}
|
||
|
||
// Probe the downloaded file to check codec compatibility
|
||
conv := Converter{
|
||
Format: qEntry.Format,
|
||
UpdateProgressPercentCallback: q.HandleProgressPercentUpdate,
|
||
}
|
||
|
||
if err := conv.ProbeFile(dlResult.FilePath); err != nil {
|
||
fmt.Println(" error probing file:", err)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
|
||
return
|
||
}
|
||
|
||
// Update codec info in UI
|
||
if downloader.ConvertStartFunc != nil {
|
||
downloader.ConvertStartFunc(ctx, conv.VideoCodecs, conv.AudioCodecs, conv.GetActionsNeeded())
|
||
}
|
||
|
||
// Check if file is small enough to upload to Telegram (<512MB)
|
||
if dlResult.FileSize >= telegramUploadThreshold {
|
||
// File too large, only save to disk
|
||
fmt.Printf(" file too large (%s >= 512MB), skipping Telegram upload\n", humanize.Bytes(uint64(dlResult.FileSize)))
|
||
qEntry.editReply(ctx, fmt.Sprintf("✅ Saved to server\n📁 %s\n💾 Size: %s\n⚠️ File too large for Telegram upload (>512MB)",
|
||
dlResult.FilePath, humanize.Bytes(uint64(dlResult.FileSize))))
|
||
qEntry.sendTypingCancelAction(ctx)
|
||
return
|
||
}
|
||
|
||
// File is small enough, process for upload
|
||
fmt.Println(" processing for upload...")
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
|
||
// Convert if needed, then upload
|
||
uploadPath := dlResult.FilePath
|
||
uploadFormat := "mkv"
|
||
if qEntry.Format == "mp3" {
|
||
uploadFormat = "mp3"
|
||
}
|
||
|
||
// For video format, determine the actual format from filename
|
||
if qEntry.Format != "mp3" {
|
||
ext := filepath.Ext(uploadPath)
|
||
if ext != "" {
|
||
uploadFormat = ext[1:] // Remove the leading dot
|
||
}
|
||
}
|
||
|
||
if conv.NeedConvert() {
|
||
// Need conversion
|
||
outputPath, outputFormat, err := conv.ConvertIfNeeded(qEntry.Ctx, dlResult.FilePath, params.SaveDir)
|
||
if err != nil {
|
||
fmt.Println(" error converting:", err)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
|
||
return
|
||
}
|
||
uploadPath = outputPath
|
||
uploadFormat = outputFormat
|
||
// Keep both original and converted files
|
||
}
|
||
|
||
// Open file for upload
|
||
file, err := os.Open(uploadPath)
|
||
if err != nil {
|
||
fmt.Println(" error opening file:", err)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
|
||
return
|
||
}
|
||
defer file.Close()
|
||
|
||
// Upload to Telegram with video dimensions
|
||
err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, file, uploadFormat, dlResult.Title, conv.VideoWidth, conv.VideoHeight)
|
||
if err != nil {
|
||
fmt.Println(" error uploading:", err)
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
file.Close()
|
||
qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err))
|
||
return
|
||
}
|
||
file.Close()
|
||
|
||
// Remove the uploaded file (since it's saved in SAVE_DIR, we keep it only if needed)
|
||
// Actually, we keep the file in SAVE_DIR as requested
|
||
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock()
|
||
q.currentlyDownloadedEntry.disableProgressPercentUpdate = true
|
||
if qEntry.Canceled {
|
||
fmt.Print(" canceled\n")
|
||
q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent)
|
||
} else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 {
|
||
fmt.Print(" progress: 100%\n")
|
||
q.updateProgress(ctx, qEntry, uploadDoneStr, 100)
|
||
}
|
||
q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock()
|
||
qEntry.sendTypingCancelAction(ctx)
|
||
}
|
||
|
||
func (q *DownloadQueue) processVideoMessageEntry(ctx context.Context, qEntry *DownloadQueueEntry) {
|
||
videoFile := qEntry.VideoDocument
|
||
if videoFile == nil {
|
||
qEntry.editReply(ctx, errorStr+": could not get video info")
|
||
return
|
||
}
|
||
|
||
// Get file name
|
||
var fileName string
|
||
for _, attr := range videoFile.Attributes {
|
||
if docAttr, ok := attr.(*tg.DocumentAttributeFilename); ok {
|
||
fileName = docAttr.FileName
|
||
break
|
||
}
|
||
}
|
||
if fileName == "" {
|
||
fileName = fmt.Sprintf("video_%d.mp4", time.Now().Unix())
|
||
}
|
||
|
||
// Create safe filename with date prefix: YYYY-MM-DD-{timestamp}-{filename}.{ext}
|
||
now := time.Now()
|
||
dateStr := now.Format("2006-01-02")
|
||
timestamp := now.Unix()
|
||
ext := filepath.Ext(fileName)
|
||
if ext == "" {
|
||
ext = ".mp4"
|
||
}
|
||
safeFileName := fmt.Sprintf("%s-%d%s", dateStr, timestamp, ext)
|
||
filePath := filepath.Join(params.SaveDir, safeFileName)
|
||
|
||
// Check if file already exists
|
||
if _, err := os.Stat(filePath); err == nil {
|
||
for i := 1; i < 1000; i++ {
|
||
safeFileName = fmt.Sprintf("%s-%d-%d%s", dateStr, timestamp, i, ext)
|
||
filePath = filepath.Join(params.SaveDir, safeFileName)
|
||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// Update progress message
|
||
qEntry.editReply(ctx, "⬇️ Downloading video...")
|
||
|
||
// Get file size
|
||
fileSize := videoFile.Size
|
||
|
||
// Download file using Telegram client
|
||
fileLoc := &tg.InputDocumentFileLocation{
|
||
ID: videoFile.ID,
|
||
AccessHash: videoFile.AccessHash,
|
||
FileReference: videoFile.FileReference,
|
||
}
|
||
fmt.Printf(" downloading video: %s (size: %s)\n", fileName, humanize.Bytes(uint64(fileSize)))
|
||
|
||
file, err := os.Create(filePath)
|
||
if err != nil {
|
||
qEntry.editReply(ctx, errorStr+": could not create file")
|
||
return
|
||
}
|
||
|
||
// Download with progress
|
||
offset := int64(0)
|
||
chunkSize := int64(1024 * 1024 * 10) // 10MB chunks
|
||
lastPercent := 0
|
||
written := int64(0)
|
||
|
||
for offset < fileSize {
|
||
select {
|
||
case <-qEntry.Ctx.Done():
|
||
file.Close()
|
||
os.Remove(filePath)
|
||
qEntry.editReply(ctx, "❌ Canceled")
|
||
qEntry.Canceled = true
|
||
return
|
||
default:
|
||
}
|
||
|
||
if offset+chunkSize > fileSize {
|
||
chunkSize = fileSize - offset
|
||
}
|
||
// Telegram API requires limit to be divisible by 1KB
|
||
if chunkSize%1024 != 0 {
|
||
// Round down to nearest 1KB boundary
|
||
chunkSize = (chunkSize / 1024) * 1024
|
||
if chunkSize == 0 {
|
||
// If remaining data is less than 1KB, round up to 1KB
|
||
// The API will return the actual remaining bytes
|
||
chunkSize = 1024
|
||
}
|
||
}
|
||
|
||
loc := &tg.InputDocumentFileLocation{
|
||
ID: videoFile.ID,
|
||
AccessHash: videoFile.AccessHash,
|
||
FileReference: fileLoc.FileReference,
|
||
}
|
||
|
||
chunk, err := telegramClient.API().UploadGetFile(qEntry.Ctx, &tg.UploadGetFileRequest{
|
||
Location: loc,
|
||
Offset: offset,
|
||
Limit: int(chunkSize),
|
||
Precise: true,
|
||
CDNSupported: false,
|
||
})
|
||
if err != nil {
|
||
file.Close()
|
||
os.Remove(filePath)
|
||
qEntry.editReply(ctx, errorStr+": failed to download chunk: "+err.Error())
|
||
return
|
||
}
|
||
|
||
chunkData, ok := chunk.(*tg.UploadFile)
|
||
if !ok {
|
||
file.Close()
|
||
os.Remove(filePath)
|
||
qEntry.editReply(ctx, errorStr+": unexpected response type")
|
||
return
|
||
}
|
||
|
||
n, err := file.Write(chunkData.Bytes)
|
||
if err != nil {
|
||
file.Close()
|
||
os.Remove(filePath)
|
||
qEntry.editReply(ctx, errorStr+": failed to write to file")
|
||
return
|
||
}
|
||
|
||
offset += int64(n)
|
||
written += int64(n)
|
||
|
||
// Update progress
|
||
if fileSize > 0 {
|
||
percent := int(float64(written) * 100 / float64(fileSize))
|
||
if percent != lastPercent && percent%10 == 0 {
|
||
lastPercent = percent
|
||
progressBar := getProgressbar(percent, progressBarLength)
|
||
qEntry.editReply(ctx, "⬇️ Downloading video...\n"+progressBar)
|
||
}
|
||
}
|
||
}
|
||
|
||
file.Close()
|
||
|
||
// Send success message
|
||
savedMsg := fmt.Sprintf("✅ Video saved\n📁 %s\n💾 Size: %s", safeFileName, humanize.Bytes(uint64(written)))
|
||
qEntry.editReply(ctx, savedMsg)
|
||
|
||
fmt.Printf(" video saved to: %s\n", filePath)
|
||
}
|
||
|
||
func (q *DownloadQueue) processor() {
|
||
for {
|
||
q.mutex.Lock()
|
||
if (len(q.entries)) == 0 {
|
||
q.mutex.Unlock()
|
||
<-q.processReqChan
|
||
continue
|
||
}
|
||
|
||
// Updating queue positions for all waiting entries.
|
||
for i := 1; i < len(q.entries); i++ {
|
||
q.entries[i].editReply(q.ctx, q.getQueuePositionString(i))
|
||
q.entries[i].sendTypingCancelAction(q.ctx)
|
||
}
|
||
|
||
q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout)
|
||
|
||
qEntry := &q.entries[0]
|
||
q.mutex.Unlock()
|
||
|
||
q.currentlyDownloadedEntry = currentlyDownloadedEntryType{}
|
||
|
||
q.processQueueEntry(q.ctx, qEntry)
|
||
|
||
q.mutex.Lock()
|
||
q.entries[0].CtxCancel()
|
||
q.entries = q.entries[1:]
|
||
if len(q.entries) == 0 {
|
||
fmt.Print("finished queue processing\n")
|
||
}
|
||
q.mutex.Unlock()
|
||
}
|
||
}
|
||
|
||
func (q *DownloadQueue) Init(ctx context.Context) {
|
||
q.ctx = ctx
|
||
q.processReqChan = make(chan bool)
|
||
go q.processor()
|
||
}
|