简介
在telegram的过程中有很多的问题,比如无法指定视频缩略图、普通用户无法提升上传速度等等!
需求
1、自动截取视频第一帧为缩略图
2、多线程上传,提升上传速度
3、将mov、mkv等非mp4文件转换为mp4后上传
4、上传后的视频需要指出流媒体播放
2、多线程上传,提升上传速度
3、将mov、mkv等非mp4文件转换为mp4后上传
4、上传后的视频需要指出流媒体播放
代码
- 依赖库
pip install FastTelethonhelper
- 引用的方法
''' 功能: 判断是不是图片 ''' def is_image(mime_type): return (mime_type.find("image")>=0 or mime_type.find("jpg")>=0 or mime_type.find("jpeg")>=0) ''' 功能: 获取视频的详情 ''' def get_video_detail(filename): vc = cv2.VideoCapture(filename) size = int(vc.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(vc.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) if vc.get(cv2.CAP_PROP_FPS) > 0: duration = int(vc.get(cv2.CAP_PROP_FRAME_COUNT) / vc.get(cv2.CAP_PROP_FPS)) else: duration = -1 return duration, width, height, size ''' 功能: 判断是不是视频 ''' def is_video(mime_type): return (mime_type.find("video")>=0) ''' 功能: 生成缩略图文件名 ''' def get_file_thumb_name(filename): name, ext = os.path.splitext(filename) return name + "_thumb" + ".jpg" ''' 功能: 判断是不是已经下载 ''' ef is_downloaded_media_file(filename, section, msg_id): return os.path.exists(os.path.join(Utils.get_download_path(section, msg_id),filename)) ''' 功能: 从视频中截取第一张图片 ''' def get_video_thumbnail(in_filename, out_filename, time=0.1): try: ( ffmpeg .input(in_filename, ss=time) # .filter('scale', width, -1) .output(out_filename, vframes=1) .overwrite_output() .run(capture_stdout=True, capture_stderr=True) ) return True except ffmpeg.Error as e: return False ''' 功能: 判断视频中是不是已经包含音频 ''' def is_contain_audio_from_mp4(filename): p = ffmpeg.probe(filename, select_streams='a') return len(p['streams'])>0
- 功能实现
字段说明:
1: section、msg_id: 参照telegram 下载
2: formatting_entities: 参照telethon.tl.types.MessageEntityBold
1: section、msg_id: 参照telegram 下载
2: formatting_entities: 参照telethon.tl.types.MessageEntityBold
''' 功能: 上传本地文件到telegram的channel files: 文件数组(example "e:\download\[section]\[msg_id]\1.mp4", "e:\download\[section]\[msg_id]\2.mp4") from_channel_id: 可以丢弃 from_group_id: to_channel_id: 目标channel to_group_id: 目标group text: message的文本 section: 本地存放文件的父文件夹 msg_id: 本地存放文件的子文件夹 comment_to_message_id: (None: 发送消息到channel, 否则: 发送消息回复) to_msg_in_grop_id: (如果comment_to_message_id->None: None, 否则: 消息在group中的id, 用于reply) formatting_entities: 消息的格式,比如粗体、斜体等 ''' async def _upload_media_to_message(self, files, from_channel_id, from_group_id, to_channel_id, to_group_id, text, section, msg_id, comment_to_message_id=None, to_msg_in_grop_id=None, formatting_entities=None): medias=[] for file in files: try: # utils: Telethon中的库 attributes, mime_type = utils.get_attributes( file, ) document = await fast_upload(self.client, file_location=file, name=os.path.basename(file)) if Utils.is_image(mime_type): medias.append(document) else: for attribute in attributes: if isinstance(attribute, DocumentAttributeVideo): if (attribute.duration == 0): duration, width, height, size = Utils.get_video_detail(file) attribute.duration = duration attribute.w = width attribute.h = height attribute.supports_streaming = True # attribute.nosound = (not Utils.is_contain_audio_from_mp4(file)) thumb_image = None if Utils.is_video(mime_type): is_have_thumbs_file = True thumbs_file_name = Utils.get_file_thumb_name(file) if not (Utils.is_downloaded_media_file(os.path.basename(thumbs_file_name), section, msg_id)): is_have_thumbs_file = Utils.get_video_thumbnail(file, thumbs_file_name) if (is_have_thumbs_file): thumb_image = await fast_upload(self.client, file_location=thumbs_file_name, name=os.path.basename(thumbs_file_name)) medias.append(InputMediaUploadedDocument( file=document, mime_type=mime_type, attributes=attributes, thumb=thumb_image, force_file = (not Utils.is_video(mime_type)), nosound_video = (not Utils.is_contain_audio_from_mp4(file)) )) logger.info("upload [{}] [{}] from [{}]-[{}] -> [{}]", "channel" if comment_to_message_id is None else "group", os.path.basename(file), section, from_channel_id, comment_to_message_id) except FloodWaitError as fwe: logger.exception(fwe) # asyncio.sleep(delay=fwe.seconds) time.sleep(fwe.seconds) except PremiumAccountRequiredError as par: continue except BadRequestError as bre: logger.info("upload [{}] [{}] from [{}]-[{}] -> [{}]\n error is {}", "channel" if comment_to_message_id is None else "group", os.path.basename(file), section, from_channel_id, comment_to_message_id, bre) continue except Exception as err: logger.info("upload [{}] [{}] from [{}]-[{}] -> [{}]\n error is {}", "channel" if comment_to_message_id is None else "group", os.path.basename(file), section, from_channel_id, comment_to_message_id, err) continue if (len(medias)>0): try: if (comment_to_message_id is None): message = await self.client.send_message(entity=Utils.intify(to_channel_id), file=medias, message=text, parse_mode="markdown", formatting_entities=formatting_entities) else: message = await self.client.send_message(entity=Utils.intify(to_group_id), file=medias, message=text, reply_to=to_msg_in_grop_id, formatting_entities=formatting_entities) except FloodWaitError as fwe: print(f'{fwe}') time.sleep(fwe.seconds) except Exception as e: raise Exception("upload send [{}] {} from [{}]-[{}] -> [{}]\n error is {}", "channel" if comment_to_message_id is None else "group", [os.path.basename(f) for f in files], section, from_channel_id, comment_to_message_id, e) logger.info("upload send [{}] {} from [{}]-[{}] -> [{}]", "channel" if comment_to_message_id is None else "group", [os.path.basename(f) for f in files], section, from_channel_id, comment_to_message_id) return message[0] if isinstance(message, list) else message else: if (comment_to_message_id is None): raise Exception("upload from [{}]-[{}]-[{}] channel failed", section, from_channel_id, msg_id) else: raise Exception("upload from [{}]-[{}]-[{}] -> [{}] group failed", section, from_channel_id, msg_id, comment_to_message_id)
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
评论(0)