Skip to content

Commit

Permalink
feat: 支持稍后再看的扫描与下载 (#131)
Browse files Browse the repository at this point in the history
* 暂存

* 写点

* feat: 支持稍后再看

* chore: 干掉 print
  • Loading branch information
amtoaer authored Jul 11, 2024
1 parent 4c5d1b6 commit c27d1a2
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 4 deletions.
4 changes: 4 additions & 0 deletions crates/bili_sync/src/adapter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod collection;
mod favorite;
mod watch_later;

use std::collections::HashSet;
use std::path::Path;
Expand All @@ -11,12 +12,14 @@ pub use collection::collection_from;
pub use favorite::favorite_from;
use futures::Stream;
use sea_orm::DatabaseConnection;
use watch_later::watch_later_from;

use crate::bilibili::{BiliClient, CollectionItem, VideoInfo};

pub enum Args<'a> {
Favorite { fid: &'a str },
Collection { collection_item: &'a CollectionItem },
WatchLater,
}

pub async fn video_list_from<'a>(
Expand All @@ -28,6 +31,7 @@ pub async fn video_list_from<'a>(
match args {
Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await,
Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await,
Args::WatchLater => watch_later_from(path, bili_client, connection).await,
}
}

Expand Down
195 changes: 195 additions & 0 deletions crates/bili_sync/src/adapter/watch_later.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;

use anyhow::Result;
use bili_sync_entity::*;
use bili_sync_migration::OnConflict;
use filenamify::filenamify;
use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, QuerySelect, TransactionTrait};

use super::VideoListModel;
use crate::bilibili::{BiliClient, BiliError, Video, VideoInfo, WatchLater};
use crate::config::TEMPLATE;
use crate::utils::id_time_key;
use crate::utils::model::create_video_pages;
use crate::utils::status::Status;

pub async fn watch_later_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
let watch_later = WatchLater::new(bili_client);
watch_later::Entity::insert(watch_later::ActiveModel {
id: Set(1),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::column(watch_later::Column::Id)
.update_column(watch_later::Column::Path)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
Box::new(
watch_later::Entity::find()
.filter(watch_later::Column::Id.eq(1))
.one(connection)
.await?
.unwrap(),
),
Box::pin(watch_later.into_video_stream()),
))
}
use async_trait::async_trait;

#[async_trait]
impl VideoListModel for watch_later::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
Ok(video::Entity::find()
.filter(video::Column::WatchLaterId.eq(self.id))
.count(connection)
.await?)
}

async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::DownloadStatus.eq(0))
.and(video::Column::Category.eq(2))
.and(video::Column::SinglePage.is_null()),
)
.all(connection)
.await?)
}

async fn unhandled_video_pages(
&self,
connection: &DatabaseConnection,
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::DownloadStatus.lt(Status::handled()))
.and(video::Column::Category.eq(2))
.and(video::Column::SinglePage.is_not_null()),
)
.find_with_related(page::Entity)
.all(connection)
.await?)
}

async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
let bvids = videos_info.iter().map(|v| v.bvid().to_string()).collect::<Vec<_>>();
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Bvid.is_in(bvids)),
)
.select_only()
.columns([video::Column::Bvid, video::Column::Favtime])
.into_tuple()
.all(connection)
.await?
.into_iter()
.map(|(bvid, time)| id_time_key(&bvid, &time))
.collect::<HashSet<_>>())
}

fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.watch_later_id = Set(Some(self.id));
if let Some(fmt_args) = &video_info.to_fmt_args() {
video_model.path = Set(Path::new(&self.path)
.join(filenamify(
TEMPLATE
.render("video", fmt_args)
.unwrap_or_else(|_| video_info.bvid().to_string()),
))
.to_string_lossy()
.to_string());
}
video_model
}

async fn fetch_videos_detail(
&self,
bili_clent: &BiliClient,
videos_model: Vec<video::Model>,
connection: &DatabaseConnection,
) -> Result<()> {
for video_model in videos_model {
let video = Video::new(bili_clent, video_model.bvid.clone());
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await;
match info {
Ok((tags, pages_info)) => {
let txn = connection.begin().await?;
// 将分页信息写入数据库
create_video_pages(&pages_info, &video_model, &txn).await?;
// 将页标记和 tag 写入数据库
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.single_page = Set(Some(pages_info.len() == 1));
video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap()));
video_active_model.save(&txn).await?;
txn.commit().await?;
}
Err(e) => {
error!(
"获取视频 {} - {} 的详细信息失败,错误为:{}",
&video_model.bvid, &video_model.name, e
);
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.valid = Set(false);
video_active_model.save(connection).await?;
}
continue;
}
};
}
Ok(())
}

fn log_fetch_video_start(&self) {
info!("开始获取稍后再看的视频与分页信息...");
}

fn log_fetch_video_end(&self) {
info!("获取稍后再看的视频与分页信息完成");
}

fn log_download_video_start(&self) {
info!("开始下载稍后再看中所有未处理过的视频...");
}

fn log_download_video_end(&self) {
info!("下载稍后再看中未处理过的视频完成");
}

fn log_refresh_video_start(&self) {
info!("开始扫描稍后再看的新视频...");
}

fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
info!(
"扫描稍后再看的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
got_count, new_count,
);
}
}
2 changes: 1 addition & 1 deletion crates/bili_sync/src/bilibili/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<'a> Collection<'a> {
break;
},
};
for video_info in videos_info.into_iter(){
for video_info in videos_info{
yield video_info;
}
let fields = match self.collection.collection_type{
Expand Down
2 changes: 1 addition & 1 deletion crates/bili_sync/src/bilibili/favorite_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a> FavoriteList<'a> {
break;
},
};
for video_info in videos_info.into_iter(){
for video_info in videos_info{
yield video_info;
}
if videos["data"]["has_more"].is_boolean() && videos["data"]["has_more"].as_bool().unwrap(){
Expand Down
52 changes: 52 additions & 0 deletions crates/bili_sync/src/bilibili/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use error::BiliError;
pub use favorite_list::FavoriteList;
use favorite_list::Upper;
pub use video::{Dimension, PageInfo, Video};
pub use watch_later::WatchLater;

mod analyzer;
mod client;
Expand All @@ -19,6 +20,7 @@ mod danmaku;
mod error;
mod favorite_list;
mod video;
mod watch_later;

pub(crate) trait Validate {
type Output;
Expand Down Expand Up @@ -81,6 +83,24 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
attr: i32,
},
/// 从稍后再看中获取的视频信息
WatchLater {
title: String,
bvid: String,
#[serde(rename = "desc")]
intro: String,
#[serde(rename = "pic")]
cover: String,
#[serde(rename = "owner")]
upper: Upper,
#[serde(with = "ts_seconds")]
ctime: DateTime<Utc>,
#[serde(rename = "add_at", with = "ts_seconds")]
fav_time: DateTime<Utc>,
#[serde(rename = "pubdate", with = "ts_seconds")]
pubtime: DateTime<Utc>,
state: i32,
},
/// 从视频列表中获取的视频信息
Simple {
bvid: String,
Expand All @@ -92,3 +112,35 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
},
}

#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};

use super::*;

#[ignore = "only for manual test"]
#[tokio::test]
async fn assert_video_info_type() {
let bili_client = BiliClient::new();
let video = Video::new(&bili_client, "BV1Z54y1C7ZB".to_string());
assert!(matches!(video.get_view_info().await, Ok(VideoInfo::View { .. })));
let collection_item = CollectionItem {
mid: "521722088".to_string(),
sid: "387214".to_string(),
collection_type: CollectionType::Series,
};
let collection = Collection::new(&bili_client, &collection_item);
let stream = collection.into_simple_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Simple { .. })));
let favorite = FavoriteList::new(&bili_client, "3084505258".to_string());
let stream = favorite.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Detail { .. })));
let watch_later = WatchLater::new(&bili_client);
let stream = watch_later.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::WatchLater { .. })));
}
}
48 changes: 48 additions & 0 deletions crates/bili_sync/src/bilibili/watch_later.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use anyhow::Result;
use async_stream::stream;
use futures::Stream;
use serde_json::Value;

use crate::bilibili::{BiliClient, Validate, VideoInfo};
pub struct WatchLater<'a> {
client: &'a BiliClient,
}

impl<'a> WatchLater<'a> {
pub fn new(client: &'a BiliClient) -> Self {
Self { client }
}

async fn get_videos(&self) -> Result<Value> {
self.client
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview")
.send()
.await?
.error_for_status()?
.json::<serde_json::Value>()
.await?
.validate()
}

pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
let Ok(mut videos) = self.get_videos().await else {
error!("Failed to get watch later list");
return;
};
if !videos["data"]["list"].is_array() {
error!("Watch later list is not an array");
}
let videos_info = match serde_json::from_value::<Vec<VideoInfo>>(videos["data"]["list"].take()) {
Ok(v) => v,
Err(e) => {
error!("Failed to parse watch later list: {}", e);
return;
}
};
for video in videos_info {
yield video;
}
}
}
}
Loading

0 comments on commit c27d1a2

Please sign in to comment.