Source code for bilibiliupload.bilibili

# -*- coding: utf-8 -*-

"""
:author: comwrg
:license: MIT
:time: 2017/06/09
"""

import base64
import hashlib
import logging
import math
import os
import re
import requests
import rsa
import time

from io import BufferedReader
from typing import *
from urllib import parse

from requests.adapters import HTTPAdapter
from urllib3 import Retry

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())

[docs]class VideoPart: def __init__(self, path, title='', desc=''): self.path = path self.title = title self.desc = desc def __repr__(self): return '<{clazz}, path: {path}, title: {title}, desc: {desc}>'.format(clazz=self.__class__.__name__, path=self.path, title=self.title, desc=self.desc)
[docs]class Bilibili: def __init__(self, cookie=None): self.session = requests.session() # debug def debug_response(r, *args, **kwargs): log.debug(r.text) self.session.hooks = {'response': debug_response} # if cookie: self.session.headers["cookie"] = cookie self.csrf = re.search('bili_jct=(.*?)(;|$)', cookie).group(1) self.mid = re.search('DedeUserID=(.*?)(;|$)', cookie).group(1) self.session.headers['Accept'] = 'application/json, text/javascript, */*; q=0.01' self.session.headers['Referer'] = 'https://space.bilibili.com/{mid}/#!/'.format(mid=self.mid) # session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36' # session.headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
[docs] def login(self, user, pwd): """ :param user: username :type user: str :param pwd: password :type pwd: str :return: if success return True else raise Exception """ APPKEY = '4409e2ce8ffd12b8' ACTIONKEY = 'appkey' BUILD = 101800 DEVICE = 'android' MOBI_APP = 'android' PLATFORM = 'android' APPSECRET = '59b43e04ad6965f34319062b478f83dd' def md5(s): h = hashlib.md5() h.update(s.encode('utf-8')) return h.hexdigest() def sign(s): """ :return: return sign """ return md5(s + APPSECRET) def signed_body(body): """ :return: body which be added sign """ if isinstance(body, str): return body + '&sign=' + sign(body) elif isinstance(body, dict): ls = [] for k, v in body.items(): ls.append(k + '=' + v) body['sign'] = sign('&'.join(ls)) return body def getkey(): """ :return: hash, key """ r = self.session.post( 'https://passport.bilibili.com/api/oauth2/getKey', signed_body({'appkey': APPKEY}), ) # {"ts":1544152439,"code":0,"data":{"hash":"99c7573759582e0b","key":"-----BEGIN PUBLIC----- -----END PUBLIC KEY-----\n"}} json = r.json() data = json['data'] return data['hash'], data['key'] def cnn_captcha(img): url = "http://47.95.255.188:5000/code" data = {"image": img} r = requests.post(url, data=data) return r.text self.session.headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' h, k = getkey() pwd = base64.b64encode( rsa.encrypt( (h + pwd).encode('utf-8'), rsa.PublicKey.load_pkcs1_openssl_pem(k.encode()), ), ) user = parse.quote_plus(user) pwd = parse.quote_plus(pwd) r = self.session.post( 'https://passport.bilibili.com/api/v2/oauth2/login', signed_body('appkey={appkey}&password={password}&username={username}' .format(appkey=APPKEY, username=user, password=pwd)), ) json = r.json() if json['code'] == -105: # need captcha self.session.headers['cookie'] = 'sid=xxxxxxxx' r = self.session.get('https://passport.bilibili.com/captcha') captcha = cnn_captcha(base64.b64encode(r.content)) r = self.session.post( 'https://passport.bilibili.com/api/v2/oauth2/login', signed_body('actionKey={actionKey}&appkey={appkey}&build={build}&captcha={captcha}&device={device}' '&mobi_app={mobi_app}&password={password}&platform={platform}&username={username}' .format(actionKey=ACTIONKEY, appkey=APPKEY, build=BUILD, captcha=captcha, device=DEVICE, mobi_app=MOBI_APP, password=pwd, platform=PLATFORM, username=user)), ) json = r.json() if json['code'] != 0: raise Exception(r.text) cookie = '; '.join( '%s=%s' % (item['name'], item['value']) for item in json['data']['cookie_info']['cookies'] ) self.session.headers["cookie"] = cookie self.csrf = re.search('bili_jct=(.*?)(;|$)', cookie).group(1) self.mid = re.search('DedeUserID=(.*?)(;|$)', cookie).group(1) self.session.headers['Accept'] = 'application/json, text/javascript, */*; q=0.01' self.session.headers['Referer'] = 'https://space.bilibili.com/{mid}/#!/'.format(mid=self.mid) return True
[docs] def upload(self, parts: Union[VideoPart, List[VideoPart]], title: str, tid: int, tag: List[str], desc: str, source='', cover='', no_reprint: bool = True, dynamic='', dtime=None, open_elec: bool = True, open_subtitle: bool = True, max_retry=5, ): """ :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: Union[VideoPart, List[VideoPart]] :param title: video's title :type title: str :param tid: video type, see: https://member.bilibili.com/x/web/archive/pre or https://github.com/uupers/BiliSpider/wiki/%E8%A7%86%E9%A2%91%E5%88%86%E5%8C%BA%E5%AF%B9%E5%BA%94%E8%A1%A8 :type tid: int :param tag: video's tag :type tag: List[str] :param desc: video's description :type desc: str :param dtime: (optional) publish date timestamp (10 digits Unix timestamp e.g. 1551533438) :type dtime: int :param source: (optional) 转载地址 :type source: str :param cover: (optional) cover's URL, use method *cover_up* to get :type cover: str :param no_reprint: (optional) Is reprint allowed :type no_reprint: bool :param dynamic: 粉丝动态 :type dynamic: str :param open_elec: (optional) whether to open charging panel (充电面板) :type open_elec: bool :param open_subtitle: (optional) Is uploading subtitles allowed :type open_subtitle: bool :param max_retry: (optional) max retry times per chunk :type max_retry: int """ if len(title) > 80: raise Exception("标题长度超过80字") if len(source) > 200: raise Exception("转载地址长度超过200字") self.session.headers['Content-Type'] = 'application/json; charset=utf-8' if not isinstance(parts, list): parts = [parts] # retry by status retries = Retry( total=max_retry, backoff_factor=1, status_forcelist=(504, ), ) self.session.mount('https://', HTTPAdapter(max_retries=retries)) self.session.mount('http://', HTTPAdapter(max_retries=retries)) # videos = [] for part in parts: filepath = part.path filename = os.path.basename(filepath) filesize = os.path.getsize(filepath) r = self.session.get('https://member.bilibili.com/preupload?' 'os=upos&upcdn=ws&name={name}&size={size}&r=upos&profile=ugcupos%2Fyb&ssl=0' .format(name=parse.quote_plus(filename), size=filesize)) """return example { "upos_uri": "upos://ugc/i181012ws18x52mti3gg0h33chn3tyhp.mp4", "biz_id": 58993125, "endpoint": "//upos-hz-upcdnws.acgvideo.com", "endpoints": [ "//upos-hz-upcdnws.acgvideo.com", "//upos-hz-upcdntx.acgvideo.com" ], "chunk_retry_delay": 3, "chunk_retry": 200, "chunk_size": 4194304, "threads": 2, "timeout": 900, "auth": "os=upos&cdn=upcdnws&uid=&net_state=4&device=&build=&os_version=&ak=×tamp=&sign=", "OK": 1 } """ json = r.json() upos_uri = json['upos_uri'] endpoint = json['endpoint'] auth = json['auth'] biz_id = json['biz_id'] chunk_size = json['chunk_size'] self.session.headers['X-Upos-Auth'] = auth # add auth header r = self.session.post('https:{}/{}?uploads&output=json'.format(endpoint, upos_uri.replace('upos://', ''))) # {"upload_id":"72eb747b9650b8c7995fdb0efbdc2bb6","key":"\/i181012ws2wg1tb7tjzswk2voxrwlk1u.mp4","OK":1,"bucket":"ugc"} json = r.json() upload_id = json['upload_id'] with open(filepath, 'rb') as f: chunks_num = math.ceil(filesize / chunk_size) chunks_index = -1 while True: chunks_data = f.read(chunk_size) if not chunks_data: break chunks_index += 1 # start with 0 def upload_chunk(): r = self.session.put('https:{endpoint}/{upos_uri}?' 'partNumber={part_number}&uploadId={upload_id}&chunk={chunk}&chunks={chunks}&size={size}&start={start}&end={end}&total={total}' .format(endpoint=endpoint, upos_uri=upos_uri.replace('upos://', ''), part_number=chunks_index+1, # starts with 1 upload_id=upload_id, chunk=chunks_index, chunks=chunks_num, size=len(chunks_data), start=chunks_index * chunk_size, end=chunks_index * chunk_size + len(chunks_data), total=filesize, ), chunks_data, ) return r def retry_upload_chunk(): """return :class:`Response` if upload success, else return None.""" for i in range(max_retry): r = upload_chunk() if r.status_code == 200: return r log.info(r.text) log.info('{}/{} retry stage {}/{}'.format(chunks_index, chunks_num, i, max_retry)) log.info('sleep %ds', 5 * i) time.sleep(5 * i) return None r = retry_upload_chunk() if r: log.info('upload part {}/{}'.format(chunks_index, chunks_num)) else: raise Exception('upload reach max retry times at part {}/{}'.format(chunks_index, chunks_num)) # NOT DELETE! Refer to https://github.com/comwrg/bilibiliupload/issues/15#issuecomment-424379769 self.session.post('https:{endpoint}/{upos_uri}?' 'output=json&name={name}&profile=ugcupos%2Fyb&uploadId={upload_id}&biz_id={biz_id}' .format(endpoint=endpoint, upos_uri=upos_uri.replace('upos://', ''), name=filename, upload_id=upload_id, biz_id=biz_id, ), {"parts": [{"partNumber": i, "eTag": "etag"} for i in range(1, chunks_num+1)]}, ) videos.append({'filename': upos_uri.replace('upos://ugc/', '').split('.')[0], 'title' : part.title, 'desc' : part.desc}) # if source is empty, copyright=1, else copyright=2 copyright = 2 if source else 1 def add(): r = self.session.post('https://member.bilibili.com/x/vu/web/add?csrf=' + self.csrf, json={ "copyright" : copyright, "source" : source, "title" : title, "tid" : tid, "tag" : ','.join(tag), "no_reprint": int(no_reprint), "desc" : desc, "cover" : cover, "mission_id": 0, "order_id" : 0, "videos" : videos, "dtime" : dtime, "open_elec" : int(open_elec), "dynamic" : dynamic, "subtitle" : { "lan" : "", "open": int(open_subtitle), }, }, ) return r def retry_add(): for i in range(max_retry): r = add() json = r.json() code = json['code'] if code == 0: return r # {"code":20001,"message":"投稿服务异常","ttl":1} if code in (20001, ): log.info('retry add video {}/{}, {}'.format(i, max_retry, r.text)) else: raise Exception('Fail to add video, {}'.format(r.text)) log.info('sleep %ds', 5 * i) time.sleep(5 * i) raise Exception('Add video reach max retry times.') r = retry_add() return r.json()
[docs] def addChannel(self, name, intro=''): """ :param name: channel's name :type name: str :param intro: channel's introduction :type intro: str """ r = self.session.post( url='https://api.bilibili.com/x/space/channel/add', data={ 'name' : name, 'intro': intro, 'jsonp' : 'jsonp', 'csrf' : self.csrf, }, # name=123&intro=123&aids=&csrf=565d7ed17cef2cc8ad054210c4e64324&_=1497077610768 )
# return # {"status":true,"data":{"cid":"15812"}}
[docs] def channel_addVideo(self, cid, aids): """ :param cid: channel's id :type cid: int :param aids: videos' id :type aids: list<int> """ r = self.session.post( url='https://api.bilibili.com/x/space/channel/video/add', data={ 'aids': '%2C'.join(aids), 'cid' : cid, 'csrf': self.csrf, }, # aids=9953555%2C9872953&cid=15814&csrf=565d7ed17cef2cc8ad054210c4e64324&_=1497079332679 )
[docs] def cover_up(self, img: Union[str, BufferedReader]): """ :param img: img path or stream :return: img URL """ if isinstance(img, str): f = open(img, 'rb') else: f = img self.session.headers['Content-Type'] = 'application/x-www-form-urlencoded' r = self.session.post( url='https://member.bilibili.com/x/vu/web/cover/up', data={ 'cover': b'data:image/jpeg;base64,' + (base64.b64encode(f.read())), 'csrf': self.csrf, }, ) # {"code":0,"data":{"url":"http://i0.hdslb.com/bfs/archive/67db4a6eae398c309244e74f6e85ae8d813bd7c9.jpg"},"message":"","ttl":1} return r.json()['data']['url']