package main import ( "context" "fmt" "os" "path/filepath" "sync" "time" "github.com/dustin/go-humanize" "github.com/gotd/td/telegram/message" "github.com/gotd/td/tg" ) const processStartStr = "šŸ” Getting information..." const processStr = "šŸ”Ø Processing" const uploadStr = "ā˜ļø Uploading" const uploadDoneStr = "šŸ Uploading" const errorStr = "āŒ Error" const canceledStr = "āŒ Canceled" const maxProgressPercentUpdateInterval = time.Second const progressBarLength = 10 type DownloadQueueEntry struct { URL string Format string // IsVideoMessage is true if this entry is for a video message download IsVideoMessage bool // VideoDocument stores the video document info for video messages VideoDocument *tg.Document OrigEntities tg.Entities OrigMsgUpdate *tg.UpdateNewMessage OrigMsg *tg.Message FromUser *tg.PeerUser FromGroup *tg.PeerChat Reply *message.Builder ReplyMsg *tg.UpdateShortSentMessage Ctx context.Context CtxCancel context.CancelFunc Canceled bool } func (e *DownloadQueueEntry) sendTypingAction(ctx context.Context) { } func (e *DownloadQueueEntry) sendTypingCancelAction(ctx context.Context) { } func (e *DownloadQueueEntry) editReply(ctx context.Context, s string) { _, _ = e.Reply.Edit(e.ReplyMsg.ID).Text(ctx, s) e.sendTypingAction(ctx) } type currentlyDownloadedEntryType struct { disableProgressPercentUpdate bool progressPercentUpdateMutex sync.Mutex lastProgressPercentUpdateAt time.Time lastProgressPercent int lastDisplayedProgressPercent int progressUpdateTimer *time.Timer sourceCodecInfo string progressInfo string } type DownloadQueue struct { ctx context.Context mutex sync.Mutex entries []DownloadQueueEntry processReqChan chan bool currentlyDownloadedEntry currentlyDownloadedEntryType } func (e *DownloadQueue) getQueuePositionString(pos int) string { return "šŸ‘Øā€šŸ‘¦ā€šŸ‘¦ Request queued at position #" + fmt.Sprint(pos) } func (q *DownloadQueue) Add(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string) { q.addEntry(ctx, entities, u, url, format, false, nil) } // AddVideo adds a video message download to the queue func (q *DownloadQueue) AddVideo(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, videoDocument *tg.Document) { q.addEntry(ctx, entities, u, "", "", true, videoDocument) } func (q *DownloadQueue) addEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url, format string, isVideo bool, videoDocument *tg.Document) { q.mutex.Lock() var replyStr string if len(q.entries) == 0 { if isVideo { replyStr = "ā¬‡ļø Downloading video..." } else { replyStr = processStartStr } } else { fmt.Println(" queueing request at position #", len(q.entries)) replyStr = q.getQueuePositionString(len(q.entries)) } newEntry := DownloadQueueEntry{ URL: url, Format: format, IsVideoMessage: isVideo, VideoDocument: videoDocument, OrigEntities: entities, OrigMsgUpdate: u, OrigMsg: u.Message.(*tg.Message), } newEntry.Reply = telegramSender.Reply(entities, u) replyText, _ := newEntry.Reply.Text(ctx, replyStr) newEntry.ReplyMsg = replyText.(*tg.UpdateShortSentMessage) newEntry.FromUser, newEntry.FromGroup = resolveMsgSrc(newEntry.OrigMsg) q.entries = append(q.entries, newEntry) q.mutex.Unlock() select { case q.processReqChan <- true: default: } } func (q *DownloadQueue) CancelCurrentEntry(ctx context.Context, entities tg.Entities, u *tg.UpdateNewMessage, url string) { q.mutex.Lock() if len(q.entries) > 0 { q.entries[0].Canceled = true q.entries[0].CtxCancel() } else { fmt.Println(" no active request to cancel") _, _ = telegramSender.Reply(entities, u).Text(ctx, errorStr+": no active request to cancel") } q.mutex.Unlock() } func (q *DownloadQueue) updateProgress(ctx context.Context, qEntry *DownloadQueueEntry, progressStr string, progressPercent int) { if progressPercent < 0 { qEntry.editReply(ctx, progressStr+"... (no progress available)\n"+q.currentlyDownloadedEntry.sourceCodecInfo) return } if progressPercent == 0 { qEntry.editReply(ctx, progressStr+"..."+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo) return } fmt.Print(" progress: ", progressPercent, "%\n") qEntry.editReply(ctx, progressStr+": "+getProgressbar(progressPercent, progressBarLength)+q.currentlyDownloadedEntry.progressInfo+"\n"+q.currentlyDownloadedEntry.sourceCodecInfo) q.currentlyDownloadedEntry.lastDisplayedProgressPercent = progressPercent } func (q *DownloadQueue) HandleProgressPercentUpdate(progressStr string, progressPercent int) { q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() defer q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() if q.currentlyDownloadedEntry.disableProgressPercentUpdate || q.currentlyDownloadedEntry.lastProgressPercent == progressPercent { return } q.currentlyDownloadedEntry.lastProgressPercent = progressPercent if progressPercent < 0 { q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) return } if q.currentlyDownloadedEntry.progressUpdateTimer != nil { q.currentlyDownloadedEntry.progressUpdateTimer.Stop() select { case <-q.currentlyDownloadedEntry.progressUpdateTimer.C: default: } } timeElapsedSinceLastUpdate := time.Since(q.currentlyDownloadedEntry.lastProgressPercentUpdateAt) if timeElapsedSinceLastUpdate < maxProgressPercentUpdateInterval { q.currentlyDownloadedEntry.progressUpdateTimer = time.AfterFunc(maxProgressPercentUpdateInterval-timeElapsedSinceLastUpdate, func() { q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() if !q.currentlyDownloadedEntry.disableProgressPercentUpdate { q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now() } q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() }) return } q.updateProgress(q.ctx, &q.entries[0], progressStr, progressPercent) q.currentlyDownloadedEntry.lastProgressPercentUpdateAt = time.Now() } func (q *DownloadQueue) processQueueEntry(ctx context.Context, qEntry *DownloadQueueEntry) { fromUsername := getFromUsername(qEntry.OrigEntities, qEntry.FromUser.UserID) fmt.Print("processing request by") if fromUsername != "" { fmt.Print(" from ", fromUsername, "#", qEntry.FromUser.UserID) } // Handle video message downloads differently if qEntry.IsVideoMessage { fmt.Println(": [video message]") q.processVideoMessageEntry(ctx, qEntry) return } fmt.Println(":", qEntry.URL) qEntry.editReply(ctx, processStartStr) downloader := Downloader{ ConvertStartFunc: func(ctx context.Context, videoCodecs, audioCodecs, convertActionsNeeded string) { q.currentlyDownloadedEntry.sourceCodecInfo = "šŸŽ¬ Source: " + videoCodecs if audioCodecs == "" { q.currentlyDownloadedEntry.sourceCodecInfo += ", no audio" } else { if videoCodecs != "" { q.currentlyDownloadedEntry.sourceCodecInfo += " / " } q.currentlyDownloadedEntry.sourceCodecInfo += audioCodecs } if convertActionsNeeded == "" { q.currentlyDownloadedEntry.sourceCodecInfo += " (no conversion needed)" } else { q.currentlyDownloadedEntry.sourceCodecInfo += " (converting: " + convertActionsNeeded + ")" } qEntry.editReply(ctx, "šŸŽ¬ Preparing download...\n"+q.currentlyDownloadedEntry.sourceCodecInfo) }, UpdateProgressPercentFunc: q.HandleProgressPercentUpdate, } // Download the file to SAVE_DIR dlResult, err := downloader.DownloadAndConvertURL(qEntry.Ctx, qEntry.OrigMsg.Message, qEntry.Format) if err != nil { fmt.Println(" error downloading:", err) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } // Probe the downloaded file to check codec compatibility conv := Converter{ Format: qEntry.Format, UpdateProgressPercentCallback: q.HandleProgressPercentUpdate, } if err := conv.ProbeFile(dlResult.FilePath); err != nil { fmt.Println(" error probing file:", err) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } // Update codec info in UI if downloader.ConvertStartFunc != nil { downloader.ConvertStartFunc(ctx, conv.VideoCodecs, conv.AudioCodecs, conv.GetActionsNeeded()) } // Check if file is small enough to upload to Telegram (<512MB) if dlResult.FileSize >= telegramUploadThreshold { // File too large, only save to disk fmt.Printf(" file too large (%s >= 512MB), skipping Telegram upload\n", humanize.Bytes(uint64(dlResult.FileSize))) qEntry.editReply(ctx, fmt.Sprintf("āœ… Saved to server\nšŸ“ %s\nšŸ’¾ Size: %s\nāš ļø File too large for Telegram upload (>512MB)", dlResult.FilePath, humanize.Bytes(uint64(dlResult.FileSize)))) qEntry.sendTypingCancelAction(ctx) return } // File is small enough, process for upload fmt.Println(" processing for upload...") q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.updateProgress(ctx, qEntry, processStr, q.currentlyDownloadedEntry.lastProgressPercent) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() // Convert if needed, then upload uploadPath := dlResult.FilePath uploadFormat := "mkv" if qEntry.Format == "mp3" { uploadFormat = "mp3" } // For video format, determine the actual format from filename if qEntry.Format != "mp3" { ext := filepath.Ext(uploadPath) if ext != "" { uploadFormat = ext[1:] // Remove the leading dot } } if conv.NeedConvert() { // Need conversion outputPath, outputFormat, err := conv.ConvertIfNeeded(qEntry.Ctx, dlResult.FilePath, params.SaveDir) if err != nil { fmt.Println(" error converting:", err) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } uploadPath = outputPath uploadFormat = outputFormat // Keep both original and converted files } // Open file for upload file, err := os.Open(uploadPath) if err != nil { fmt.Println(" error opening file:", err) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } defer file.Close() // Upload to Telegram with video dimensions err = dlUploader.UploadFile(qEntry.Ctx, qEntry.OrigEntities, qEntry.OrigMsgUpdate, file, uploadFormat, dlResult.Title, conv.VideoWidth, conv.VideoHeight) if err != nil { fmt.Println(" error uploading:", err) q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() file.Close() qEntry.editReply(ctx, fmt.Sprint(errorStr+": ", err)) return } file.Close() // Remove the uploaded file (since it's saved in SAVE_DIR, we keep it only if needed) // Actually, we keep the file in SAVE_DIR as requested q.currentlyDownloadedEntry.progressPercentUpdateMutex.Lock() q.currentlyDownloadedEntry.disableProgressPercentUpdate = true if qEntry.Canceled { fmt.Print(" canceled\n") q.updateProgress(ctx, qEntry, canceledStr, q.currentlyDownloadedEntry.lastProgressPercent) } else if q.currentlyDownloadedEntry.lastDisplayedProgressPercent < 100 { fmt.Print(" progress: 100%\n") q.updateProgress(ctx, qEntry, uploadDoneStr, 100) } q.currentlyDownloadedEntry.progressPercentUpdateMutex.Unlock() qEntry.sendTypingCancelAction(ctx) } func (q *DownloadQueue) processVideoMessageEntry(ctx context.Context, qEntry *DownloadQueueEntry) { videoFile := qEntry.VideoDocument if videoFile == nil { qEntry.editReply(ctx, errorStr+": could not get video info") return } // Get file name var fileName string for _, attr := range videoFile.Attributes { if docAttr, ok := attr.(*tg.DocumentAttributeFilename); ok { fileName = docAttr.FileName break } } if fileName == "" { fileName = fmt.Sprintf("video_%d.mp4", time.Now().Unix()) } // Create safe filename with date prefix: YYYY-MM-DD-{timestamp}-{filename}.{ext} now := time.Now() dateStr := now.Format("2006-01-02") timestamp := now.Unix() ext := filepath.Ext(fileName) if ext == "" { ext = ".mp4" } safeFileName := fmt.Sprintf("%s-%d%s", dateStr, timestamp, ext) filePath := filepath.Join(params.SaveDir, safeFileName) // Check if file already exists if _, err := os.Stat(filePath); err == nil { for i := 1; i < 1000; i++ { safeFileName = fmt.Sprintf("%s-%d-%d%s", dateStr, timestamp, i, ext) filePath = filepath.Join(params.SaveDir, safeFileName) if _, err := os.Stat(filePath); os.IsNotExist(err) { break } } } // Update progress message qEntry.editReply(ctx, "ā¬‡ļø Downloading video...") // Get file size fileSize := videoFile.Size // Download file using Telegram client fileLoc := &tg.InputDocumentFileLocation{ ID: videoFile.ID, AccessHash: videoFile.AccessHash, FileReference: videoFile.FileReference, } fmt.Printf(" downloading video: %s (size: %s)\n", fileName, humanize.Bytes(uint64(fileSize))) file, err := os.Create(filePath) if err != nil { qEntry.editReply(ctx, errorStr+": could not create file") return } // Download with progress offset := int64(0) chunkSize := int64(1024 * 1024) // 1MB chunks lastPercent := 0 written := int64(0) for offset < fileSize { select { case <-qEntry.Ctx.Done(): file.Close() os.Remove(filePath) qEntry.editReply(ctx, "āŒ Canceled") qEntry.Canceled = true return default: } if offset+chunkSize > fileSize { chunkSize = fileSize - offset } // Telegram API requires limit to be divisible by 1KB if chunkSize%1024 != 0 { // Round down to nearest 1KB boundary chunkSize = (chunkSize / 1024) * 1024 if chunkSize == 0 { // If remaining data is less than 1KB, round up to 1KB // The API will return the actual remaining bytes chunkSize = 1024 } } loc := &tg.InputDocumentFileLocation{ ID: videoFile.ID, AccessHash: videoFile.AccessHash, FileReference: fileLoc.FileReference, } chunk, err := telegramClient.API().UploadGetFile(qEntry.Ctx, &tg.UploadGetFileRequest{ Location: loc, Offset: offset, Limit: int(chunkSize), Precise: true, CDNSupported: false, }) if err != nil { file.Close() os.Remove(filePath) qEntry.editReply(ctx, errorStr+": failed to download chunk: "+err.Error()) return } chunkData, ok := chunk.(*tg.UploadFile) if !ok { file.Close() os.Remove(filePath) qEntry.editReply(ctx, errorStr+": unexpected response type") return } n, err := file.Write(chunkData.Bytes) if err != nil { file.Close() os.Remove(filePath) qEntry.editReply(ctx, errorStr+": failed to write to file") return } offset += int64(n) written += int64(n) // Update progress if fileSize > 0 { percent := int(float64(written) * 100 / float64(fileSize)) if percent != lastPercent && percent%10 == 0 { lastPercent = percent progressBar := getProgressbar(percent, progressBarLength) qEntry.editReply(ctx, "ā¬‡ļø Downloading video...\n"+progressBar) } } } file.Close() // Send success message savedMsg := fmt.Sprintf("āœ… Video saved\nšŸ“ %s\nšŸ’¾ Size: %s", safeFileName, humanize.Bytes(uint64(written))) qEntry.editReply(ctx, savedMsg) fmt.Printf(" video saved to: %s\n", filePath) } func (q *DownloadQueue) processor() { for { q.mutex.Lock() if (len(q.entries)) == 0 { q.mutex.Unlock() <-q.processReqChan continue } // Updating queue positions for all waiting entries. for i := 1; i < len(q.entries); i++ { q.entries[i].editReply(q.ctx, q.getQueuePositionString(i)) q.entries[i].sendTypingCancelAction(q.ctx) } q.entries[0].Ctx, q.entries[0].CtxCancel = context.WithTimeout(q.ctx, downloadAndConvertTimeout) qEntry := &q.entries[0] q.mutex.Unlock() q.currentlyDownloadedEntry = currentlyDownloadedEntryType{} q.processQueueEntry(q.ctx, qEntry) q.mutex.Lock() q.entries[0].CtxCancel() q.entries = q.entries[1:] if len(q.entries) == 0 { fmt.Print("finished queue processing\n") } q.mutex.Unlock() } } func (q *DownloadQueue) Init(ctx context.Context) { q.ctx = ctx q.processReqChan = make(chan bool) go q.processor() }