Skip to content

Commit

Permalink
Feature all in one (v1.1.0) (#4)
Browse files Browse the repository at this point in the history
Feature:
1、打包为exe可执行文件并包含了ffmpeg,省去了python和ffmpeg安装步骤,开箱即用。
2、监控多个直播间只需要运行一次,减少内存占用。
3、优化控制台输出,更加明晰。
Co-authored-by: weichenzhao <[email protected]>
  • Loading branch information
ZhaoWeicheng98 authored Jan 26, 2021
1 parent 5eaa647 commit 36964a5
Show file tree
Hide file tree
Showing 18 changed files with 481 additions and 258 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,4 @@ dmypy.json
data/
log/
config/*
!config/example.spec.json
!config/root_config.json
!config/config.json
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.pythonPath": "C:\\Users\\12050\\AppData\\Local\\Programs\\Python\\Python39\\python.exe"
}
25 changes: 12 additions & 13 deletions BaseLive.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
import datetime
import logging

import traceback
import requests
import urllib3
from requests.adapters import HTTPAdapter
Expand All @@ -10,13 +10,9 @@


class BaseLive(metaclass=abc.ABCMeta):
__last_check_time = datetime.datetime.now(
)+datetime.timedelta(seconds=-60)
__allowed_check_interval = datetime.timedelta(
seconds=60)
__live_status = False

def __init__(self, config: dict):

default_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
Expand All @@ -31,7 +27,10 @@ def __init__(self, config: dict):
self.site_name = ''
self.site_domain = ''
self.config = config
BaseLive.__allowed_check_interval = datetime.timedelta(
self.__last_check_time = datetime.datetime.now(
)+datetime.timedelta(seconds=-config['root']['check_interval'])
self.__live_status = False
self.__allowed_check_interval = datetime.timedelta(
seconds=config['root']['check_interval'])

def common_request(self, method: str, url: str, params: dict = None, data: dict = None) -> requests.Response:
Expand All @@ -45,7 +44,7 @@ def common_request(self, method: str, url: str, params: dict = None, data: dict
url, headers=self.headers, params=params, data=data, verify=False, timeout=5)
return connection
except requests.exceptions.RequestException as e:
logging.error(self.generate_log("Request Error"+str(e)))
logging.error(self.generate_log("Request Error"+str(e)+traceback.format_exc()))

@abc.abstractmethod
def get_room_info(self):
Expand All @@ -67,17 +66,17 @@ def __check_live_status(self) -> bool:

@property
def live_status(self) -> bool:
if datetime.datetime.now()-BaseLive.__last_check_time >= BaseLive.__allowed_check_interval:
if datetime.datetime.now()-self.__last_check_time >= self.__allowed_check_interval:
logging.debug(self.generate_log("允许检查"))
BaseLive.__live_status = self.__check_live_status()
BaseLive.__last_check_time = datetime.datetime.now()
self.__live_status = self.__check_live_status()
self.__last_check_time = datetime.datetime.now()
else:
logging.debug(self.generate_log("间隔不足,使用过去状态"))
return BaseLive.__live_status
return self.__live_status

@live_status.setter
def live_status(self, status: bool):
BaseLive.__live_status = status
self.__live_status = status

def generate_log(self, content: str = '') -> str:
return f"[Site:{self.site_name} Room:{self.room_id}] {content}"
1 change: 1 addition & 0 deletions BiliLive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class BiliLive(BaseLive):
def __init__(self, config: dict):
super().__init__(config)
Expand Down
7 changes: 4 additions & 3 deletions BiliLiveRecorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import traceback

import requests
import urllib3
Expand All @@ -14,9 +15,9 @@

class BiliLiveRecorder(BiliLive):
def __init__(self, config: dict, global_start: datetime.datetime):
super().__init__(config)
BiliLive.__init__(self,config)
self.record_dir = utils.init_record_dir(
self.room_id, global_start, config['root']['global_path']['data_path'])
self.room_id, global_start, config['root']['data_path'])

def record(self, record_url: str, output_filename: str) -> None:
try:
Expand Down Expand Up @@ -60,4 +61,4 @@ def run(self) -> None:
break
except Exception as e:
logging.error(self.generate_log(
'Error while checking or recording:' + str(e)))
'Error while checking or recording:' + str(e)+traceback.format_exc()))
16 changes: 8 additions & 8 deletions BiliVideoChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
import logging
import os
import time

import threading
import requests
import urllib3

import utils

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class BiliVideoChecker:
class BiliVideoChecker(threading.Thread):
def __init__(self, bvid: str, path: str, config: dict):
threading.Thread.__init__(self)
default_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
Expand All @@ -38,7 +38,7 @@ def common_request(self, method: str, url: str, params: dict = None, data: dict
url, headers=self.headers, params=params, data=data, verify=False)
return connection

def check(self) -> None:
def run(self) -> None:
logging.basicConfig(level=utils.get_log_level(self.config),
format='%(asctime)s %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
Expand All @@ -50,11 +50,11 @@ def check(self) -> None:
}).json()
try:
if video_info['code'] == 0 and video_info['data']['state'] == 0:
logging.info("稿件%s 已开放浏览,准备删除 %s",self.bvid, self.path)
logging.info("稿件%s 已开放浏览,准备删除 %s", self.bvid, self.path)
utils.del_files_and_dir(self.path)
return
else:
logging.info("稿件%s 未开放浏览", self.bvid)
time.sleep(self.check_interval)
except KeyError:
pass
finally:
logging.info("稿件%s 未开放浏览",self.bvid)
time.sleep(self.check_interval)
17 changes: 10 additions & 7 deletions DanmuRecorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import logging
import os
import zlib

from aiowebsocket.converses import AioWebSocket

import traceback
import utils
from BiliLive import BiliLive


class BiliDanmuRecorder(BiliLive):
def __init__(self, config: dict, global_start: datetime.datetime):
super().__init__(config)
BiliLive.__init__(self, config)
self.log_filename = utils.init_danmu_log_file(
self.room_id, global_start, config['root']['global_path']['data_path'])
self.room_id, global_start, config['root']['data_path'])
self.room_server_api = 'wss://broadcastlv.chat.bilibili.com/sub'

def __log_danmu(self, body: str) -> None:
Expand Down Expand Up @@ -64,8 +63,12 @@ def run(self):
handlers=[logging.FileHandler(os.path.join(self.config['root']['logger']['log_path'], "DanmuRecoder_"+datetime.datetime.now(
).strftime('%Y-%m-%d_%H-%M-%S')+'.log'), "a", encoding="utf-8")])
try:
_ = open(self.log_filename, 'a', encoding="utf-8") # 提前创建弹幕记录文件避免因为没有弹幕而失败
asyncio.get_event_loop().run_until_complete(self.__startup())
# 提前创建弹幕记录文件避免因为没有弹幕而失败
_ = open(self.log_filename, 'a', encoding="utf-8")
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__startup())
except KeyboardInterrupt:
logging.info(self.generate_log("键盘指令退出"))

Expand Down Expand Up @@ -123,4 +126,4 @@ def __printDM(self, data):
logging.info(self.generate_log('[OTHER] '+jd['cmd']))
except Exception as e:
logging.error(self.generate_log(
'Error while parsing danmu data:'+str(e)))
'Error while parsing danmu data:'+str(e)+traceback.format_exc()))
101 changes: 101 additions & 0 deletions MainRunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import _thread
import datetime
import logging
import threading
import time
import traceback
from multiprocessing import Process, Value

import utils
from BiliLive import BiliLive
from BiliLiveRecorder import BiliLiveRecorder
from BiliVideoChecker import BiliVideoChecker
from DanmuRecorder import BiliDanmuRecorder
from Processor import Processor
from Uploader import Uploader


class MainRunner():
def __init__(self, config):
self.config = config
self.prev_live_status = False
self.current_state = Value(
'i', int(utils.state.WAITING_FOR_LIVE_START))
self.state_change_time = Value('f', time.time())
if self.config['root']['enable_baiduyun']:
from bypy import ByPy
_ = ByPy()
self.bl = BiliLive(self.config)
self.blr = None
self.bdr = None

def proc(self, config: dict, record_dir: str, danmu_path: str, current_state, state_change_time) -> None:
p = Processor(config, record_dir, danmu_path)
p.run()

if config['spec']['uploader']['record']['upload_record'] or config['spec']['uploader']['clips']['upload_clips']:
current_state.value = int(utils.state.UPLOADING_TO_BILIBILI)
state_change_time.value = time.time()
u = Uploader(p.outputs_dir, p.splits_dir, config)
d = u.upload(p.global_start)
if not config['spec']['uploader']['record']['keep_record_after_upload'] and d.get("record", None) is not None:
rc = BiliVideoChecker(d['record']['bvid'],
p.splits_dir, config)
rc.start()
if not config['spec']['uploader']['clips']['keep_clips_after_upload'] and d.get("clips", None) is not None:
cc = BiliVideoChecker(d['clips']['bvid'],
p.outputs_dir, config)
cc.start()

if config['root']['enable_baiduyun'] and config['spec']['backup']:
current_state.value = int(utils.state.UPLOADING_TO_BAIDUYUN)
state_change_time.value = time.time()
from bypy import ByPy
bp = ByPy()
bp.upload(p.merged_file_path)

if current_state.value != int(utils.state.LIVE_STARTED):
current_state.value = int(utils.state.WAITING_FOR_LIVE_START)
state_change_time.value = time.time()

def run(self):
try:
while True:
if not self.prev_live_status and self.bl.live_status:
self.current_state.value = int(utils.state.LIVE_STARTED)
self.state_change_time.value = time.time()
self.prev_live_status = self.bl.live_status
start = datetime.datetime.now()
self.blr = BiliLiveRecorder(self.config, start)
self.bdr = BiliDanmuRecorder(self.config, start)
record_process = Process(
target=self.blr.run)
danmu_process = Process(
target=self.bdr.run)
danmu_process.start()
record_process.start()

record_process.join()
danmu_process.join()

self.current_state.value = int(utils.state.PROCESSING_RECORDS)
self.state_change_time.value = time.time()
self.prev_live_status = self.bl.live_status
proc_process = Process(target=self.proc, args=(
self.config, self.blr.record_dir, self.bdr.log_filename, self.current_state, self.state_change_time))
proc_process.start()
else:
time.sleep(self.config['root']['check_interval'])
except KeyboardInterrupt:
return
except Exception as e:
logging.error('Error in Mainrunner:' +
str(e)+traceback.format_exc())

class MainThreadRunner(threading.Thread):
def __init__(self, config):
threading.Thread.__init__(self)
self.mr = MainRunner(config)

def run(self):
self.mr.run()
23 changes: 11 additions & 12 deletions Processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def count(danmus: Dict[datetime.datetime, List[str]], live_start: datetime.datet
return return_dict


def flv2ts(input_file: str, output_file: str, ffmpeg_path: str = "ffmpeg") -> subprocess.CompletedProcess:
def flv2ts(input_file: str, output_file: str) -> subprocess.CompletedProcess:
ret = subprocess.run(
f"{ffmpeg_path} -y -fflags +discardcorrupt -i {input_file} -c copy -bsf:v h264_mp4toannexb -f mpegts {output_file}", shell=True, check=True)
f"ffmpeg -y -fflags +discardcorrupt -i {input_file} -c copy -bsf:v h264_mp4toannexb -f mpegts {output_file}", shell=True, check=True)
return ret


def concat(merge_conf_path: str, merged_file_path: str, ffmpeg_path: str = "ffmpeg") -> subprocess.CompletedProcess:
def concat(merge_conf_path: str, merged_file_path: str) -> subprocess.CompletedProcess:
ret = subprocess.run(
f"{ffmpeg_path} -y -f concat -safe 0 -i {merge_conf_path} -c copy -fflags +igndts -avoid_negative_ts make_zero {merged_file_path}", shell=True, check=True)
f"ffmpeg -y -f concat -safe 0 -i {merge_conf_path} -c copy -fflags +igndts -avoid_negative_ts make_zero {merged_file_path}", shell=True, check=True)
return ret


Expand All @@ -111,13 +111,13 @@ def __init__(self, config: Dict, record_dir: str, danmu_path: str):
self.global_start = utils.get_global_start_from_records(
self.record_dir)
self.merge_conf_path = utils.get_merge_conf_path(
self.room_id, self.global_start, config['root']['global_path']['data_path'])
self.room_id, self.global_start, config['root']['data_path'])
self.merged_file_path = utils.get_mergd_filename(
self.room_id, self.global_start, config['root']['global_path']['data_path'])
self.room_id, self.global_start, config['root']['data_path'])
self.outputs_dir = utils.init_outputs_dir(
self.room_id, self.global_start, config['root']['global_path']['data_path'])
self.room_id, self.global_start, config['root']['data_path'])
self.splits_dir = utils.init_splits_dir(
self.room_id, self.global_start, self.config['root']['global_path']['data_path'])
self.room_id, self.global_start, self.config['root']['data_path'])
self.times = []
self.live_start = self.global_start
self.live_duration = 0
Expand All @@ -137,7 +137,7 @@ def pre_concat(self) -> None:
ts_path = os.path.splitext(os.path.join(
self.record_dir, filename))[0]+".ts"
_ = flv2ts(os.path.join(
self.record_dir, filename), ts_path, self.config['root']['global_path']['ffmpeg_path'])
self.record_dir, filename), ts_path)
if not self.config['spec']['recorder']['keep_raw_record']:
os.remove(os.path.join(self.record_dir, filename))
# ts_path = os.path.join(self.record_dir, filename)
Expand All @@ -147,8 +147,7 @@ def pre_concat(self) -> None:
self.times.append((start_time, duration))
f.write(
f"file '{ts_path}'\n")
_ = concat(self.merge_conf_path, self.merged_file_path,
self.config['root']['global_path']['ffmpeg_path'])
_ = concat(self.merge_conf_path, self.merged_file_path)
self.times.sort(key=lambda x: x[0])
self.live_start = self.times[0][0]
self.live_duration = (
Expand Down Expand Up @@ -178,7 +177,7 @@ def split(self, split_interval: int = 3600) -> None:
shutil.copy2(self.merged_file_path, os.path.join(
self.splits_dir, f"{self.room_id}_{self.global_start.strftime('%Y-%m-%d_%H-%M-%S')}_0.mp4"))
return

duration = float(ffmpeg.probe(self.merged_file_path)
['format']['duration'])
num_splits = int(duration) // split_interval + 1
Expand Down
Loading

0 comments on commit 36964a5

Please sign in to comment.