Skip to content

API Documentation

__description__ = 'A useful CLI tool for downloading posts in Kemono.party / .su' module-attribute

__title__ = 'KToolBox' module-attribute

__version__ = 'v0.12.0' module-attribute

__main__

main()

Source code in ktoolbox/__main__.py
 8
 9
10
11
12
13
14
def main():
    try:
        logger_init(cli_use=True)
        uvloop_init()
        fire.Fire(KToolBoxCli)
    except KeyboardInterrupt:
        logger.error("KToolBox was interrupted by the user")

action

ActionRet

Bases: BaseRet[_T]

Return data model of action call

Source code in ktoolbox/action/base.py
10
11
12
class ActionRet(BaseRet[_T]):
    """Return data model of action call"""
    pass

FetchInterruptError

Bases: Exception

Exception for interrupt of data fetching

Source code in ktoolbox/action/fetch.py
11
12
13
14
15
16
class FetchInterruptError(Exception):
    """Exception for interrupt of data fetching"""

    def __init__(self, *args, ret: BaseRet = None):
        super().__init__(*args)
        self.ret = ret

ret = ret instance-attribute

__init__(*args, ret=None)

Source code in ktoolbox/action/fetch.py
14
15
16
def __init__(self, *args, ret: BaseRet = None):
    super().__init__(*args)
    self.ret = ret

create_job_from_creator(service, creator_id, path, *, all_pages=False, offset=0, length=50, save_creator_indices=False, mix_posts=None, start_time, end_time) async

Create a list of download job from a creator

Parameters:

Name Type Description Default
service str

The service where the post is located

required
creator_id str

The ID of the creator

required
path Path

The path for downloading posts, which needs to be sanitized

required
all_pages bool

Fetch all posts, offset and length will be ignored if enabled

False
offset int

Result offset (or start offset)

0
length Optional[int]

The number of posts to fetch

50
save_creator_indices bool

Record CreatorIndices data.

False
mix_posts bool

Save all files from different posts at same path, save_creator_indices will be ignored if enabled

None
start_time Optional[datetime]

Start time of the time range

required
end_time Optional[datetime]

End time of the time range

required
Source code in ktoolbox/action/job.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def create_job_from_creator(
        service: str,
        creator_id: str,
        path: Path,
        *,
        all_pages: bool = False,
        offset: int = 0,
        length: Optional[int] = 50,
        save_creator_indices: bool = False,
        mix_posts: bool = None,
        start_time: Optional[datetime],
        end_time: Optional[datetime]
) -> ActionRet[List[Job]]:
    """
    Create a list of download job from a creator

    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param path: The path for downloading posts, which needs to be sanitized
    :param all_pages: Fetch all posts, ``offset`` and ``length`` will be ignored if enabled
    :param offset: Result offset (or start offset)
    :param length: The number of posts to fetch
    :param save_creator_indices: Record ``CreatorIndices`` data.
    :param mix_posts: Save all files from different posts at same path, \
     ``save_creator_indices`` will be ignored if enabled
    :param start_time: Start time of the time range
    :param end_time: End time of the time range
    """
    mix_posts = config.job.mix_posts if mix_posts is None else mix_posts

    # Get posts
    logger.info(f"Start fetching posts from creator {creator_id}")
    post_list: List[Post] = []
    start_offset = offset - offset % 50
    if all_pages:
        page_counter = count()
    else:
        page_num = length // 50 + 1
        page_counter = iter(range(page_num))

    try:
        async for part in fetch_creator_posts(service=service, creator_id=creator_id, o=start_offset):
            if next(page_counter, None) is not None:
                post_list += part
            else:
                break
    except FetchInterruptError as e:
        return ActionRet(**e.ret.model_dump(mode="python"))

    if not all_pages:
        post_list = post_list[offset % 50:][:length]
    else:
        post_list = post_list[offset % 50:]

    # Filter posts by publish time
    if start_time or end_time:
        post_list = list(filter_posts_by_date(post_list, start_time, end_time))
    logger.info(f"Get {len(post_list)} posts, start creating jobs")

    # Filter posts and generate ``CreatorIndices``
    if not mix_posts:
        if save_creator_indices:
            indices = CreatorIndices(
                creator_id=creator_id,
                service=service,
                posts={post.id: post for post in post_list},
                posts_path={post.id: path / sanitize_filename(post.title) for post in post_list}
            )
            async with aiofiles.open(
                    path / DataStorageNameEnum.CreatorIndicesData.value,
                    "w",
                    encoding="utf-8"
            ) as f:
                await f.write(indices.model_dump_json(indent=config.json_dump_indent))

    job_list: List[Job] = []
    for post in post_list:
        # Get post path
        post_path = path if mix_posts else path / generate_post_path_name(post)

        # Generate jobs
        job_list += await create_job_from_post(
            post=post,
            post_path=post_path,
            post_structure=False if mix_posts else None,
            dump_post_data=not mix_posts
        )
    return ActionRet(data=job_list)

create_job_from_post(post, post_path, *, post_structure=None, dump_post_data=True) async

Create a list of download job from a post data

Parameters:

Name Type Description Default
post Post

post data

required
post_path Path

Path of the post directory, which needs to be sanitized

required
post_structure Union[PostStructureConfiguration, bool]

post path structure, False -> disable, True & None -> config.job.post_structure

None
dump_post_data bool

Whether to dump post data (post.json) in post directory

True
Source code in ktoolbox/action/job.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def create_job_from_post(
        post: Post,
        post_path: Path,
        *,
        post_structure: Union[PostStructureConfiguration, bool] = None,
        dump_post_data: bool = True
) -> List[Job]:
    """
    Create a list of download job from a post data

    :param post: post data
    :param post_path: Path of the post directory, which needs to be sanitized
    :param post_structure: post path structure, ``False`` -> disable, \
     ``True`` & ``None`` -> ``config.job.post_structure``
    :param dump_post_data: Whether to dump post data (post.json) in post directory
    """
    post_path.mkdir(exist_ok=True)

    # Load ``PostStructureConfiguration``
    if post_structure in [True, None]:
        post_structure = config.job.post_structure
    if post_structure:
        attachments_path = post_path / post_structure.attachments  # attachments
        attachments_path.mkdir(exist_ok=True)
        content_path = post_path / post_structure.content_filepath  # content
        content_path.parent.mkdir(exist_ok=True)
    else:
        attachments_path = post_path
        content_path = None

    # Filter and create jobs for ``Post.attachment``
    jobs: List[Job] = []
    for i, attachment in enumerate(post.attachments):  # type: int, Attachment
        if not attachment.path:
            continue
        file_path_obj = Path(attachment.name) if is_valid_filename(attachment.name) else Path(
            urlparse(attachment.path).path
        )
        if (not config.job.allow_list or any(
                map(
                    lambda x: fnmatch(file_path_obj.name, x),
                    config.job.allow_list
                )
        )) and not any(
            map(
                lambda x: fnmatch(file_path_obj.name, x),
                config.job.block_list
            )
        ):
            basic_filename = f"{i + 1}{file_path_obj.suffix}" if config.job.sequential_filename else file_path_obj.name
            alt_filename = generate_filename(post, basic_filename)
            jobs.append(
                Job(
                    path=attachments_path,
                    alt_filename=alt_filename,
                    server_path=attachment.path,
                    type=PostFileTypeEnum.Attachment
                )
            )

    # Filter and create jobs for ``Post.file``
    if post.file and post.file.path:
        post_file_name = Path(post.file.name) if is_valid_filename(post.file.name) else Path(
            urlparse(post.file.path).path
        )
        if (not config.job.allow_list or any(
                map(
                    lambda x: fnmatch(post_file_name.name, x),
                    config.job.allow_list
                )
        )) and not any(
            map(
                lambda x: fnmatch(post_file_name.name, x),
                config.job.block_list
            )
        ):
            jobs.append(
                Job(
                    path=post_path,
                    alt_filename=f"{post.id}_{post_file_name.name}",
                    server_path=post.file.path,
                    type=PostFileTypeEnum.File
                )
            )

    # Write content file
    if content_path and post.content:
        async with aiofiles.open(content_path, "w", encoding=config.downloader.encoding) as f:
            await f.write(post.content)
    if dump_post_data:
        async with aiofiles.open(str(post_path / DataStorageNameEnum.PostData.value), "w", encoding="utf-8") as f:
            await f.write(
                post.model_dump_json(indent=config.json_dump_indent)
            )

    return jobs

fetch_creator_posts(service, creator_id, o=0) async

Fetch posts from a creator

Parameters:

Name Type Description Default
service str

The service where the post is located

required
creator_id str

The ID of the creator

required
o int

Result offset, stepping of 50 is enforced

0

Returns:

Type Description
AsyncGenerator[List[Post], Any]

Async generator of several list of posts

Raises:

Type Description
FetchInterruptError

Exception for interrupt of data fetching

Source code in ktoolbox/action/fetch.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def fetch_creator_posts(service: str, creator_id: str, o: int = 0) -> AsyncGenerator[List[Post], Any]:
    """
    Fetch posts from a creator

    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param o: Result offset, stepping of 50 is enforced
    :return: Async generator of several list of posts
    :raise FetchInterruptError: Exception for interrupt of data fetching
    """
    while True:
        ret = await get_creator_post(service=service, creator_id=creator_id, o=o)
        if ret:
            yield ret.data
            if len(ret.data) < SEARCH_STEP:
                break
            else:
                o += SEARCH_STEP
        else:
            raise FetchInterruptError(ret=ret)

filter_posts_by_date(post_list, start_date, end_date)

Filter posts by publish date range

Parameters:

Name Type Description Default
post_list List[Post]

List of posts

required
start_date Optional[datetime]

Start time of the time range

required
end_date Optional[datetime]

End time of the time range

required
Source code in ktoolbox/action/utils.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def filter_posts_by_date(
        post_list: List[Post],
        start_date: Optional[datetime],
        end_date: Optional[datetime]
) -> Generator[Post, Any, Any]:
    """
    Filter posts by publish date range

    :param post_list: List of posts
    :param start_date: Start time of the time range
    :param end_date: End time of the time range
    """
    post_filter = filter(lambda x: _match_post_date(x, start_date, end_date), post_list)
    yield from post_filter

filter_posts_by_indices(posts, indices)

Compare and filter posts by CreatorIndices data

Only keep posts that was edited after last download.

Parameters:

Name Type Description Default
posts List[Post]

Posts to filter

required
indices CreatorIndices

CreatorIndices data to use

required

Returns:

Type Description
Tuple[List[Post], CreatorIndices]

A updated List[Post] and updated new CreatorIndices instance

Source code in ktoolbox/action/utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def filter_posts_by_indices(posts: List[Post], indices: CreatorIndices) -> Tuple[List[Post], CreatorIndices]:
    """
    Compare and filter posts by ``CreatorIndices`` data

    Only keep posts that was edited after last download.

    :param posts: Posts to filter
    :param indices: ``CreatorIndices`` data to use
    :return: A updated ``List[Post]`` and updated **new** ``CreatorIndices`` instance
    """
    new_list = list(
        filter(
            lambda x: x.id not in indices.posts or x.edited > indices.posts[x.id].edited, posts
        )
    )
    new_indices = indices.model_copy(deep=True)
    for post in new_list:
        new_indices.posts[post.id] = post
    return new_list, new_indices

generate_filename(post, basic_name)

Generate download filename

Source code in ktoolbox/action/utils.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def generate_filename(post: Post, basic_name: str) -> str:
    """Generate download filename"""
    basic_name_suffix = Path(basic_name).suffix
    basic_name_filename = basic_name.split(basic_name_suffix)[0] if basic_name_suffix else basic_name
    try:
        return sanitize_filename(
            config.job.filename_format.format(
                basic_name_filename,
                id=post.id,
                user=post.user,
                service=post.service,
                title=post.title,
                added=post.added.strftime(TIME_FORMAT) if post.added else "",
                published=post.published.strftime(TIME_FORMAT) if post.published else "",
                edited=post.edited.strftime(TIME_FORMAT) if post.edited else ""
            ) + basic_name_suffix
        )
    except KeyError as e:
        logger.error(f"`JobConfiguration.filename_format` contains invalid key: {e}")
        exit(1)

generate_post_path_name(post)

Generate directory name for post to save.

Source code in ktoolbox/action/utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def generate_post_path_name(post: Post) -> str:
    """Generate directory name for post to save."""
    if not post.title:
        return post.id
    else:
        try:
            return sanitize_filename(
                config.job.post_dirname_format.format(
                    id=post.id,
                    user=post.user,
                    service=post.service,
                    title=post.title,
                    added=post.added.strftime(TIME_FORMAT) if post.added else "",
                    published=post.published.strftime(TIME_FORMAT) if post.published else "",
                    edited=post.edited.strftime(TIME_FORMAT) if post.edited else ""
                )
            )
        except KeyError as e:
            logger.error(f"`JobConfiguration.post_dirname_format` contains invalid key: {e}")
            exit(1)

search_creator(id=None, name=None, service=None) async

Search creator with multiple keywords support.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
Source code in ktoolbox/action/search.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def search_creator(id: str = None, name: str = None, service: str = None) -> BaseRet[Iterator[Creator]]:
    """
    Search creator with multiple keywords support.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    """

    def filter_func(creator: Creator):
        """Filter creators with attributes"""
        if id is not None and creator.id != id:
            return False
        if name is not None and name not in creator.name:
            return False
        if service is not None and creator.service != service:
            return False
        return True

    ret = await get_creators()
    if not ret:
        base_ret = BaseRet.model_validate(ret.model_dump())
        base_ret.data = iter([])
        return base_ret
    creators = ret.data
    return ActionRet(data=iter(filter(filter_func, creators)))

search_creator_post(id=None, name=None, service=None, q=None, o=None) async

Search posts from creator with multiple keywords support.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
q str

Search query

None
o str

Result offset, stepping of 50 is enforced

None
Source code in ktoolbox/action/search.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def search_creator_post(
        id: str = None,
        name: str = None,
        service: str = None,
        q: str = None,
        o: str = None
) -> BaseRet[List[Post]]:
    """
    Search posts from creator with multiple keywords support.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    :param q: Search query
    :param o: Result offset, stepping of 50 is enforced
    """

    async def inner(**kwargs):
        posts: List[Post] = []
        if any([id, name, service]):
            if id is not None and service:  # ``get_creator_post`` required
                ret = await get_creator_post(
                    service=service,
                    creator_id=id,
                    q=q,
                    o=o
                )
                return ActionRet(data=ret.data) if ret else ret
            else:  # else need to get ``id`` and ``service``
                creators_ret = await search_creator(id=id, name=name, service=service)
                if not creators_ret:
                    return ActionRet(**creators_ret.model_dump(mode="python"))
                else:
                    for creator in creators_ret.data:
                        ret = await get_creator_post(
                            service=creator.service,
                            creator_id=creator.id,
                            q=q,
                            o=o
                        )
                        if ret:
                            posts += ret.data
                    return ActionRet(data=posts)
        else:
            return ActionRet(
                code=RetCodeEnum.MissingParameter,
                message=generate_msg(
                    "Missing `id`, `name`, `service` parameter, at least given one of them.",
                    **kwargs
                )
            )

    return await inner(id=id, name=name, service=service, q=q, o=o)

base

__all__ = ['ActionRet'] module-attribute

ActionRet

Bases: BaseRet[_T]

Return data model of action call

Source code in ktoolbox/action/base.py
10
11
12
class ActionRet(BaseRet[_T]):
    """Return data model of action call"""
    pass

fetch

__all__ = ['FetchInterruptError', 'fetch_creator_posts'] module-attribute

FetchInterruptError

Bases: Exception

Exception for interrupt of data fetching

Source code in ktoolbox/action/fetch.py
11
12
13
14
15
16
class FetchInterruptError(Exception):
    """Exception for interrupt of data fetching"""

    def __init__(self, *args, ret: BaseRet = None):
        super().__init__(*args)
        self.ret = ret
ret = ret instance-attribute
__init__(*args, ret=None)
Source code in ktoolbox/action/fetch.py
14
15
16
def __init__(self, *args, ret: BaseRet = None):
    super().__init__(*args)
    self.ret = ret

fetch_creator_posts(service, creator_id, o=0) async

Fetch posts from a creator

Parameters:

Name Type Description Default
service str

The service where the post is located

required
creator_id str

The ID of the creator

required
o int

Result offset, stepping of 50 is enforced

0

Returns:

Type Description
AsyncGenerator[List[Post], Any]

Async generator of several list of posts

Raises:

Type Description
FetchInterruptError

Exception for interrupt of data fetching

Source code in ktoolbox/action/fetch.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def fetch_creator_posts(service: str, creator_id: str, o: int = 0) -> AsyncGenerator[List[Post], Any]:
    """
    Fetch posts from a creator

    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param o: Result offset, stepping of 50 is enforced
    :return: Async generator of several list of posts
    :raise FetchInterruptError: Exception for interrupt of data fetching
    """
    while True:
        ret = await get_creator_post(service=service, creator_id=creator_id, o=o)
        if ret:
            yield ret.data
            if len(ret.data) < SEARCH_STEP:
                break
            else:
                o += SEARCH_STEP
        else:
            raise FetchInterruptError(ret=ret)

job

__all__ = ['create_job_from_post', 'create_job_from_creator'] module-attribute

create_job_from_creator(service, creator_id, path, *, all_pages=False, offset=0, length=50, save_creator_indices=False, mix_posts=None, start_time, end_time) async

Create a list of download job from a creator

Parameters:

Name Type Description Default
service str

The service where the post is located

required
creator_id str

The ID of the creator

required
path Path

The path for downloading posts, which needs to be sanitized

required
all_pages bool

Fetch all posts, offset and length will be ignored if enabled

False
offset int

Result offset (or start offset)

0
length Optional[int]

The number of posts to fetch

50
save_creator_indices bool

Record CreatorIndices data.

False
mix_posts bool

Save all files from different posts at same path, save_creator_indices will be ignored if enabled

None
start_time Optional[datetime]

Start time of the time range

required
end_time Optional[datetime]

End time of the time range

required
Source code in ktoolbox/action/job.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def create_job_from_creator(
        service: str,
        creator_id: str,
        path: Path,
        *,
        all_pages: bool = False,
        offset: int = 0,
        length: Optional[int] = 50,
        save_creator_indices: bool = False,
        mix_posts: bool = None,
        start_time: Optional[datetime],
        end_time: Optional[datetime]
) -> ActionRet[List[Job]]:
    """
    Create a list of download job from a creator

    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param path: The path for downloading posts, which needs to be sanitized
    :param all_pages: Fetch all posts, ``offset`` and ``length`` will be ignored if enabled
    :param offset: Result offset (or start offset)
    :param length: The number of posts to fetch
    :param save_creator_indices: Record ``CreatorIndices`` data.
    :param mix_posts: Save all files from different posts at same path, \
     ``save_creator_indices`` will be ignored if enabled
    :param start_time: Start time of the time range
    :param end_time: End time of the time range
    """
    mix_posts = config.job.mix_posts if mix_posts is None else mix_posts

    # Get posts
    logger.info(f"Start fetching posts from creator {creator_id}")
    post_list: List[Post] = []
    start_offset = offset - offset % 50
    if all_pages:
        page_counter = count()
    else:
        page_num = length // 50 + 1
        page_counter = iter(range(page_num))

    try:
        async for part in fetch_creator_posts(service=service, creator_id=creator_id, o=start_offset):
            if next(page_counter, None) is not None:
                post_list += part
            else:
                break
    except FetchInterruptError as e:
        return ActionRet(**e.ret.model_dump(mode="python"))

    if not all_pages:
        post_list = post_list[offset % 50:][:length]
    else:
        post_list = post_list[offset % 50:]

    # Filter posts by publish time
    if start_time or end_time:
        post_list = list(filter_posts_by_date(post_list, start_time, end_time))
    logger.info(f"Get {len(post_list)} posts, start creating jobs")

    # Filter posts and generate ``CreatorIndices``
    if not mix_posts:
        if save_creator_indices:
            indices = CreatorIndices(
                creator_id=creator_id,
                service=service,
                posts={post.id: post for post in post_list},
                posts_path={post.id: path / sanitize_filename(post.title) for post in post_list}
            )
            async with aiofiles.open(
                    path / DataStorageNameEnum.CreatorIndicesData.value,
                    "w",
                    encoding="utf-8"
            ) as f:
                await f.write(indices.model_dump_json(indent=config.json_dump_indent))

    job_list: List[Job] = []
    for post in post_list:
        # Get post path
        post_path = path if mix_posts else path / generate_post_path_name(post)

        # Generate jobs
        job_list += await create_job_from_post(
            post=post,
            post_path=post_path,
            post_structure=False if mix_posts else None,
            dump_post_data=not mix_posts
        )
    return ActionRet(data=job_list)

create_job_from_post(post, post_path, *, post_structure=None, dump_post_data=True) async

Create a list of download job from a post data

Parameters:

Name Type Description Default
post Post

post data

required
post_path Path

Path of the post directory, which needs to be sanitized

required
post_structure Union[PostStructureConfiguration, bool]

post path structure, False -> disable, True & None -> config.job.post_structure

None
dump_post_data bool

Whether to dump post data (post.json) in post directory

True
Source code in ktoolbox/action/job.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def create_job_from_post(
        post: Post,
        post_path: Path,
        *,
        post_structure: Union[PostStructureConfiguration, bool] = None,
        dump_post_data: bool = True
) -> List[Job]:
    """
    Create a list of download job from a post data

    :param post: post data
    :param post_path: Path of the post directory, which needs to be sanitized
    :param post_structure: post path structure, ``False`` -> disable, \
     ``True`` & ``None`` -> ``config.job.post_structure``
    :param dump_post_data: Whether to dump post data (post.json) in post directory
    """
    post_path.mkdir(exist_ok=True)

    # Load ``PostStructureConfiguration``
    if post_structure in [True, None]:
        post_structure = config.job.post_structure
    if post_structure:
        attachments_path = post_path / post_structure.attachments  # attachments
        attachments_path.mkdir(exist_ok=True)
        content_path = post_path / post_structure.content_filepath  # content
        content_path.parent.mkdir(exist_ok=True)
    else:
        attachments_path = post_path
        content_path = None

    # Filter and create jobs for ``Post.attachment``
    jobs: List[Job] = []
    for i, attachment in enumerate(post.attachments):  # type: int, Attachment
        if not attachment.path:
            continue
        file_path_obj = Path(attachment.name) if is_valid_filename(attachment.name) else Path(
            urlparse(attachment.path).path
        )
        if (not config.job.allow_list or any(
                map(
                    lambda x: fnmatch(file_path_obj.name, x),
                    config.job.allow_list
                )
        )) and not any(
            map(
                lambda x: fnmatch(file_path_obj.name, x),
                config.job.block_list
            )
        ):
            basic_filename = f"{i + 1}{file_path_obj.suffix}" if config.job.sequential_filename else file_path_obj.name
            alt_filename = generate_filename(post, basic_filename)
            jobs.append(
                Job(
                    path=attachments_path,
                    alt_filename=alt_filename,
                    server_path=attachment.path,
                    type=PostFileTypeEnum.Attachment
                )
            )

    # Filter and create jobs for ``Post.file``
    if post.file and post.file.path:
        post_file_name = Path(post.file.name) if is_valid_filename(post.file.name) else Path(
            urlparse(post.file.path).path
        )
        if (not config.job.allow_list or any(
                map(
                    lambda x: fnmatch(post_file_name.name, x),
                    config.job.allow_list
                )
        )) and not any(
            map(
                lambda x: fnmatch(post_file_name.name, x),
                config.job.block_list
            )
        ):
            jobs.append(
                Job(
                    path=post_path,
                    alt_filename=f"{post.id}_{post_file_name.name}",
                    server_path=post.file.path,
                    type=PostFileTypeEnum.File
                )
            )

    # Write content file
    if content_path and post.content:
        async with aiofiles.open(content_path, "w", encoding=config.downloader.encoding) as f:
            await f.write(post.content)
    if dump_post_data:
        async with aiofiles.open(str(post_path / DataStorageNameEnum.PostData.value), "w", encoding="utf-8") as f:
            await f.write(
                post.model_dump_json(indent=config.json_dump_indent)
            )

    return jobs

search

__all__ = ['search_creator', 'search_creator_post'] module-attribute

search_creator(id=None, name=None, service=None) async

Search creator with multiple keywords support.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
Source code in ktoolbox/action/search.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def search_creator(id: str = None, name: str = None, service: str = None) -> BaseRet[Iterator[Creator]]:
    """
    Search creator with multiple keywords support.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    """

    def filter_func(creator: Creator):
        """Filter creators with attributes"""
        if id is not None and creator.id != id:
            return False
        if name is not None and name not in creator.name:
            return False
        if service is not None and creator.service != service:
            return False
        return True

    ret = await get_creators()
    if not ret:
        base_ret = BaseRet.model_validate(ret.model_dump())
        base_ret.data = iter([])
        return base_ret
    creators = ret.data
    return ActionRet(data=iter(filter(filter_func, creators)))

search_creator_post(id=None, name=None, service=None, q=None, o=None) async

Search posts from creator with multiple keywords support.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
q str

Search query

None
o str

Result offset, stepping of 50 is enforced

None
Source code in ktoolbox/action/search.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def search_creator_post(
        id: str = None,
        name: str = None,
        service: str = None,
        q: str = None,
        o: str = None
) -> BaseRet[List[Post]]:
    """
    Search posts from creator with multiple keywords support.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    :param q: Search query
    :param o: Result offset, stepping of 50 is enforced
    """

    async def inner(**kwargs):
        posts: List[Post] = []
        if any([id, name, service]):
            if id is not None and service:  # ``get_creator_post`` required
                ret = await get_creator_post(
                    service=service,
                    creator_id=id,
                    q=q,
                    o=o
                )
                return ActionRet(data=ret.data) if ret else ret
            else:  # else need to get ``id`` and ``service``
                creators_ret = await search_creator(id=id, name=name, service=service)
                if not creators_ret:
                    return ActionRet(**creators_ret.model_dump(mode="python"))
                else:
                    for creator in creators_ret.data:
                        ret = await get_creator_post(
                            service=creator.service,
                            creator_id=creator.id,
                            q=q,
                            o=o
                        )
                        if ret:
                            posts += ret.data
                    return ActionRet(data=posts)
        else:
            return ActionRet(
                code=RetCodeEnum.MissingParameter,
                message=generate_msg(
                    "Missing `id`, `name`, `service` parameter, at least given one of them.",
                    **kwargs
                )
            )

    return await inner(id=id, name=name, service=service, q=q, o=o)

utils

TIME_FORMAT = '%Y-%m-%d' module-attribute

__all__ = ['generate_post_path_name', 'generate_filename', 'filter_posts_by_date', 'filter_posts_by_indices'] module-attribute

filter_posts_by_date(post_list, start_date, end_date)

Filter posts by publish date range

Parameters:

Name Type Description Default
post_list List[Post]

List of posts

required
start_date Optional[datetime]

Start time of the time range

required
end_date Optional[datetime]

End time of the time range

required
Source code in ktoolbox/action/utils.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def filter_posts_by_date(
        post_list: List[Post],
        start_date: Optional[datetime],
        end_date: Optional[datetime]
) -> Generator[Post, Any, Any]:
    """
    Filter posts by publish date range

    :param post_list: List of posts
    :param start_date: Start time of the time range
    :param end_date: End time of the time range
    """
    post_filter = filter(lambda x: _match_post_date(x, start_date, end_date), post_list)
    yield from post_filter

filter_posts_by_indices(posts, indices)

Compare and filter posts by CreatorIndices data

Only keep posts that was edited after last download.

Parameters:

Name Type Description Default
posts List[Post]

Posts to filter

required
indices CreatorIndices

CreatorIndices data to use

required

Returns:

Type Description
Tuple[List[Post], CreatorIndices]

A updated List[Post] and updated new CreatorIndices instance

Source code in ktoolbox/action/utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def filter_posts_by_indices(posts: List[Post], indices: CreatorIndices) -> Tuple[List[Post], CreatorIndices]:
    """
    Compare and filter posts by ``CreatorIndices`` data

    Only keep posts that was edited after last download.

    :param posts: Posts to filter
    :param indices: ``CreatorIndices`` data to use
    :return: A updated ``List[Post]`` and updated **new** ``CreatorIndices`` instance
    """
    new_list = list(
        filter(
            lambda x: x.id not in indices.posts or x.edited > indices.posts[x.id].edited, posts
        )
    )
    new_indices = indices.model_copy(deep=True)
    for post in new_list:
        new_indices.posts[post.id] = post
    return new_list, new_indices

generate_filename(post, basic_name)

Generate download filename

Source code in ktoolbox/action/utils.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def generate_filename(post: Post, basic_name: str) -> str:
    """Generate download filename"""
    basic_name_suffix = Path(basic_name).suffix
    basic_name_filename = basic_name.split(basic_name_suffix)[0] if basic_name_suffix else basic_name
    try:
        return sanitize_filename(
            config.job.filename_format.format(
                basic_name_filename,
                id=post.id,
                user=post.user,
                service=post.service,
                title=post.title,
                added=post.added.strftime(TIME_FORMAT) if post.added else "",
                published=post.published.strftime(TIME_FORMAT) if post.published else "",
                edited=post.edited.strftime(TIME_FORMAT) if post.edited else ""
            ) + basic_name_suffix
        )
    except KeyError as e:
        logger.error(f"`JobConfiguration.filename_format` contains invalid key: {e}")
        exit(1)

generate_post_path_name(post)

Generate directory name for post to save.

Source code in ktoolbox/action/utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def generate_post_path_name(post: Post) -> str:
    """Generate directory name for post to save."""
    if not post.title:
        return post.id
    else:
        try:
            return sanitize_filename(
                config.job.post_dirname_format.format(
                    id=post.id,
                    user=post.user,
                    service=post.service,
                    title=post.title,
                    added=post.added.strftime(TIME_FORMAT) if post.added else "",
                    published=post.published.strftime(TIME_FORMAT) if post.published else "",
                    edited=post.edited.strftime(TIME_FORMAT) if post.edited else ""
                )
            )
        except KeyError as e:
            logger.error(f"`JobConfiguration.post_dirname_format` contains invalid key: {e}")
            exit(1)

api

  • Kemono API version: 1.0.0

  • current App commit hash: 7ee4a7b18ee92a442c13950c05dc8236cfb14a60

APIRet

Bases: BaseRet[_T]

Return data model of API call

Source code in ktoolbox/api/base.py
62
63
64
class APIRet(BaseRet[_T]):
    """Return data model of API call"""
    pass

APITenacityStop

Bases: stop_base

APIs Stop strategies

Source code in ktoolbox/api/base.py
21
22
23
24
25
26
27
28
class APITenacityStop(stop_base):
    """APIs Stop strategies"""

    def __call__(self, retry_state: RetryCallState) -> bool:
        if config.api.retry_times is None:
            return stop_never(retry_state)
        else:
            return stop_after_attempt(config.api.retry_times)(retry_state)

__call__(retry_state)

Source code in ktoolbox/api/base.py
24
25
26
27
28
def __call__(self, retry_state: RetryCallState) -> bool:
    if config.api.retry_times is None:
        return stop_never(retry_state)
    else:
        return stop_after_attempt(config.api.retry_times)(retry_state)

BaseAPI

Bases: ABC, Generic[_T]

Source code in ktoolbox/api/base.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BaseAPI(ABC, Generic[_T]):
    path: str = "/"
    method: Literal["get", "post"]
    extra_validator: Optional[Callable[[str], BaseModel]] = None
    client = httpx.AsyncClient(verify=config.ssl_verify)

    Response = BaseModel
    """API response model"""

    @classmethod
    def handle_res(cls, res: httpx.Response) -> APIRet[_T]:
        """Handle API response"""
        try:
            if cls.extra_validator:
                res_model = cls.extra_validator(res.text)
            else:
                res_model = cls.Response.model_validate_json(res.text)
        except (ValueError, ValidationError) as e:
            if isinstance(e, ValueError):
                return APIRet(
                    code=RetCodeEnum.JsonDecodeError,
                    message=str(e),
                    exception=e
                )
            elif isinstance(e, ValidationError):
                return APIRet(
                    code=RetCodeEnum.ValidationError,
                    message=str(e),
                    exception=e
                )
        else:
            data = res_model.root if isinstance(res_model, RootModel) else res_model
            return APIRet(data=data)

    @classmethod
    @_retry
    async def request(cls, path: str = None, **kwargs) -> APIRet[_T]:
        """
        Make a request to the API
        :param path: Fully initialed URL path
        :param kwargs: Keyword arguments of ``httpx._client.AsyncClient.request``
        """
        if path is None:
            path = cls.path
        url_parts = [config.api.scheme, config.api.netloc, f"{config.api.path}{path}", '', '', '']
        url = str(urlunparse(url_parts))
        try:
            res = await cls.client.request(
                method=cls.method,
                url=url,
                timeout=config.api.timeout,
                follow_redirects=True,
                **kwargs
            )
        except Exception as e:
            return APIRet(
                code=RetCodeEnum.NetWorkError,
                message=str(e),
                exception=e
            )
        else:
            return cls.handle_res(res)

    @classmethod
    @abstractmethod
    async def __call__(cls, *args, **kwargs) -> APIRet[Response]:
        """Function to call API"""
        ...

Response = BaseModel class-attribute instance-attribute

API response model

client = httpx.AsyncClient(verify=config.ssl_verify) class-attribute instance-attribute

extra_validator: Optional[Callable[[str], BaseModel]] = None class-attribute instance-attribute

method: Literal['get', 'post'] instance-attribute

path: str = '/' class-attribute instance-attribute

__call__(*args, **kwargs) abstractmethod async classmethod

Function to call API

Source code in ktoolbox/api/base.py
130
131
132
133
134
@classmethod
@abstractmethod
async def __call__(cls, *args, **kwargs) -> APIRet[Response]:
    """Function to call API"""
    ...

handle_res(res) classmethod

Handle API response

Source code in ktoolbox/api/base.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def handle_res(cls, res: httpx.Response) -> APIRet[_T]:
    """Handle API response"""
    try:
        if cls.extra_validator:
            res_model = cls.extra_validator(res.text)
        else:
            res_model = cls.Response.model_validate_json(res.text)
    except (ValueError, ValidationError) as e:
        if isinstance(e, ValueError):
            return APIRet(
                code=RetCodeEnum.JsonDecodeError,
                message=str(e),
                exception=e
            )
        elif isinstance(e, ValidationError):
            return APIRet(
                code=RetCodeEnum.ValidationError,
                message=str(e),
                exception=e
            )
    else:
        data = res_model.root if isinstance(res_model, RootModel) else res_model
        return APIRet(data=data)

request(path=None, **kwargs) async classmethod

Make a request to the API

Parameters:

Name Type Description Default
path str

Fully initialed URL path

None
kwargs

Keyword arguments of httpx._client.AsyncClient.request

{}
Source code in ktoolbox/api/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
@_retry
async def request(cls, path: str = None, **kwargs) -> APIRet[_T]:
    """
    Make a request to the API
    :param path: Fully initialed URL path
    :param kwargs: Keyword arguments of ``httpx._client.AsyncClient.request``
    """
    if path is None:
        path = cls.path
    url_parts = [config.api.scheme, config.api.netloc, f"{config.api.path}{path}", '', '', '']
    url = str(urlunparse(url_parts))
    try:
        res = await cls.client.request(
            method=cls.method,
            url=url,
            timeout=config.api.timeout,
            follow_redirects=True,
            **kwargs
        )
    except Exception as e:
        return APIRet(
            code=RetCodeEnum.NetWorkError,
            message=str(e),
            exception=e
        )
    else:
        return cls.handle_res(res)

base

__all__ = ['APITenacityStop', 'APIRet', 'BaseAPI'] module-attribute

APIRet

Bases: BaseRet[_T]

Return data model of API call

Source code in ktoolbox/api/base.py
62
63
64
class APIRet(BaseRet[_T]):
    """Return data model of API call"""
    pass

APITenacityStop

Bases: stop_base

APIs Stop strategies

Source code in ktoolbox/api/base.py
21
22
23
24
25
26
27
28
class APITenacityStop(stop_base):
    """APIs Stop strategies"""

    def __call__(self, retry_state: RetryCallState) -> bool:
        if config.api.retry_times is None:
            return stop_never(retry_state)
        else:
            return stop_after_attempt(config.api.retry_times)(retry_state)
__call__(retry_state)
Source code in ktoolbox/api/base.py
24
25
26
27
28
def __call__(self, retry_state: RetryCallState) -> bool:
    if config.api.retry_times is None:
        return stop_never(retry_state)
    else:
        return stop_after_attempt(config.api.retry_times)(retry_state)

BaseAPI

Bases: ABC, Generic[_T]

Source code in ktoolbox/api/base.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BaseAPI(ABC, Generic[_T]):
    path: str = "/"
    method: Literal["get", "post"]
    extra_validator: Optional[Callable[[str], BaseModel]] = None
    client = httpx.AsyncClient(verify=config.ssl_verify)

    Response = BaseModel
    """API response model"""

    @classmethod
    def handle_res(cls, res: httpx.Response) -> APIRet[_T]:
        """Handle API response"""
        try:
            if cls.extra_validator:
                res_model = cls.extra_validator(res.text)
            else:
                res_model = cls.Response.model_validate_json(res.text)
        except (ValueError, ValidationError) as e:
            if isinstance(e, ValueError):
                return APIRet(
                    code=RetCodeEnum.JsonDecodeError,
                    message=str(e),
                    exception=e
                )
            elif isinstance(e, ValidationError):
                return APIRet(
                    code=RetCodeEnum.ValidationError,
                    message=str(e),
                    exception=e
                )
        else:
            data = res_model.root if isinstance(res_model, RootModel) else res_model
            return APIRet(data=data)

    @classmethod
    @_retry
    async def request(cls, path: str = None, **kwargs) -> APIRet[_T]:
        """
        Make a request to the API
        :param path: Fully initialed URL path
        :param kwargs: Keyword arguments of ``httpx._client.AsyncClient.request``
        """
        if path is None:
            path = cls.path
        url_parts = [config.api.scheme, config.api.netloc, f"{config.api.path}{path}", '', '', '']
        url = str(urlunparse(url_parts))
        try:
            res = await cls.client.request(
                method=cls.method,
                url=url,
                timeout=config.api.timeout,
                follow_redirects=True,
                **kwargs
            )
        except Exception as e:
            return APIRet(
                code=RetCodeEnum.NetWorkError,
                message=str(e),
                exception=e
            )
        else:
            return cls.handle_res(res)

    @classmethod
    @abstractmethod
    async def __call__(cls, *args, **kwargs) -> APIRet[Response]:
        """Function to call API"""
        ...
Response = BaseModel class-attribute instance-attribute

API response model

client = httpx.AsyncClient(verify=config.ssl_verify) class-attribute instance-attribute
extra_validator: Optional[Callable[[str], BaseModel]] = None class-attribute instance-attribute
method: Literal['get', 'post'] instance-attribute
path: str = '/' class-attribute instance-attribute
__call__(*args, **kwargs) abstractmethod async classmethod

Function to call API

Source code in ktoolbox/api/base.py
130
131
132
133
134
@classmethod
@abstractmethod
async def __call__(cls, *args, **kwargs) -> APIRet[Response]:
    """Function to call API"""
    ...
handle_res(res) classmethod

Handle API response

Source code in ktoolbox/api/base.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def handle_res(cls, res: httpx.Response) -> APIRet[_T]:
    """Handle API response"""
    try:
        if cls.extra_validator:
            res_model = cls.extra_validator(res.text)
        else:
            res_model = cls.Response.model_validate_json(res.text)
    except (ValueError, ValidationError) as e:
        if isinstance(e, ValueError):
            return APIRet(
                code=RetCodeEnum.JsonDecodeError,
                message=str(e),
                exception=e
            )
        elif isinstance(e, ValidationError):
            return APIRet(
                code=RetCodeEnum.ValidationError,
                message=str(e),
                exception=e
            )
    else:
        data = res_model.root if isinstance(res_model, RootModel) else res_model
        return APIRet(data=data)
request(path=None, **kwargs) async classmethod

Make a request to the API

Parameters:

Name Type Description Default
path str

Fully initialed URL path

None
kwargs

Keyword arguments of httpx._client.AsyncClient.request

{}
Source code in ktoolbox/api/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
@_retry
async def request(cls, path: str = None, **kwargs) -> APIRet[_T]:
    """
    Make a request to the API
    :param path: Fully initialed URL path
    :param kwargs: Keyword arguments of ``httpx._client.AsyncClient.request``
    """
    if path is None:
        path = cls.path
    url_parts = [config.api.scheme, config.api.netloc, f"{config.api.path}{path}", '', '', '']
    url = str(urlunparse(url_parts))
    try:
        res = await cls.client.request(
            method=cls.method,
            url=url,
            timeout=config.api.timeout,
            follow_redirects=True,
            **kwargs
        )
    except Exception as e:
        return APIRet(
            code=RetCodeEnum.NetWorkError,
            message=str(e),
            exception=e
        )
    else:
        return cls.handle_res(res)

misc

get_app_version = GetAppVersion.__call__ module-attribute

Show current App commit hash

GetAppVersion

Bases: BaseAPI

Source code in ktoolbox/api/misc/get_app_version.py
 8
 9
10
11
12
13
14
15
16
17
18
19
class GetAppVersion(BaseAPI):
    path = "/app_version"
    method = "get"

    class Response(RootModel[str]):
        root: str

    extra_validator = Response.model_validate_strings

    @classmethod
    async def __call__(cls) -> APIRet[str]:
        return await cls.request()
extra_validator = Response.model_validate_strings class-attribute instance-attribute
method = 'get' class-attribute instance-attribute
path = '/app_version' class-attribute instance-attribute
Response

Bases: RootModel[str]

Source code in ktoolbox/api/misc/get_app_version.py
12
13
class Response(RootModel[str]):
    root: str
root: str instance-attribute
__call__() async classmethod
Source code in ktoolbox/api/misc/get_app_version.py
17
18
19
@classmethod
async def __call__(cls) -> APIRet[str]:
    return await cls.request()

model

Announcement

Bases: BaseModel

Source code in ktoolbox/api/model/announcement.py
 9
10
11
12
13
14
15
16
17
class Announcement(BaseModel):
    service: Optional[str] = None
    user_id: Optional[str] = None
    hash: Optional[str] = None
    """sha256"""
    content: Optional[str] = None
    added: Optional[datetime] = None
    # noinspection SpellCheckingInspection
    """isoformat UTC"""
added: Optional[datetime] = None class-attribute instance-attribute

isoformat UTC

content: Optional[str] = None class-attribute instance-attribute
hash: Optional[str] = None class-attribute instance-attribute

sha256

service: Optional[str] = None class-attribute instance-attribute
user_id: Optional[str] = None class-attribute instance-attribute

Attachment

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
16
17
18
class Attachment(BaseModel):
    name: Optional[str] = None
    path: Optional[str] = None
name: Optional[str] = None class-attribute instance-attribute
path: Optional[str] = None class-attribute instance-attribute

Creator

Bases: BaseModel

Source code in ktoolbox/api/model/creator.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Creator(BaseModel):
    # noinspection SpellCheckingInspection
    favorited: int
    # noinspection SpellCheckingInspection
    """The number of times this creator has been favorited"""
    id: str
    """The ID of the creator"""
    indexed: datetime
    """Timestamp when the creator was indexed, Unix time as integer"""
    name: str
    """The name of the creator"""
    service: str
    """The service for the creator"""
    updated: datetime
    """Timestamp when the creator was last updated, Unix time as integer"""
favorited: int instance-attribute

The number of times this creator has been favorited

id: str instance-attribute

The ID of the creator

indexed: datetime instance-attribute

Timestamp when the creator was indexed, Unix time as integer

name: str instance-attribute

The name of the creator

service: str instance-attribute

The service for the creator

updated: datetime instance-attribute

Timestamp when the creator was last updated, Unix time as integer

File

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
11
12
13
class File(BaseModel):
    name: Optional[str] = None
    path: Optional[str] = None
name: Optional[str] = None class-attribute instance-attribute
path: Optional[str] = None class-attribute instance-attribute

Post

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
21
22
23
24
25
26
27
28
29
30
31
32
33
class Post(BaseModel):
    id: Optional[str] = None
    user: Optional[str] = None
    service: Optional[str] = None
    title: Optional[str] = None
    content: Optional[str] = None
    embed: Optional[Dict[str, Any]] = None
    shared_file: Optional[bool] = None
    added: Optional[datetime] = None
    published: Optional[datetime] = None
    edited: Optional[datetime] = None
    file: Optional[File] = None
    attachments: Optional[List[Attachment]] = None
added: Optional[datetime] = None class-attribute instance-attribute
attachments: Optional[List[Attachment]] = None class-attribute instance-attribute
content: Optional[str] = None class-attribute instance-attribute
edited: Optional[datetime] = None class-attribute instance-attribute
embed: Optional[Dict[str, Any]] = None class-attribute instance-attribute
file: Optional[File] = None class-attribute instance-attribute
id: Optional[str] = None class-attribute instance-attribute
published: Optional[datetime] = None class-attribute instance-attribute
service: Optional[str] = None class-attribute instance-attribute
shared_file: Optional[bool] = None class-attribute instance-attribute
title: Optional[str] = None class-attribute instance-attribute
user: Optional[str] = None class-attribute instance-attribute

announcement

__all__ = ['Announcement'] module-attribute
Announcement

Bases: BaseModel

Source code in ktoolbox/api/model/announcement.py
 9
10
11
12
13
14
15
16
17
class Announcement(BaseModel):
    service: Optional[str] = None
    user_id: Optional[str] = None
    hash: Optional[str] = None
    """sha256"""
    content: Optional[str] = None
    added: Optional[datetime] = None
    # noinspection SpellCheckingInspection
    """isoformat UTC"""
added: Optional[datetime] = None class-attribute instance-attribute

isoformat UTC

content: Optional[str] = None class-attribute instance-attribute
hash: Optional[str] = None class-attribute instance-attribute

sha256

service: Optional[str] = None class-attribute instance-attribute
user_id: Optional[str] = None class-attribute instance-attribute

creator

__all__ = ['Creator'] module-attribute
Creator

Bases: BaseModel

Source code in ktoolbox/api/model/creator.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Creator(BaseModel):
    # noinspection SpellCheckingInspection
    favorited: int
    # noinspection SpellCheckingInspection
    """The number of times this creator has been favorited"""
    id: str
    """The ID of the creator"""
    indexed: datetime
    """Timestamp when the creator was indexed, Unix time as integer"""
    name: str
    """The name of the creator"""
    service: str
    """The service for the creator"""
    updated: datetime
    """Timestamp when the creator was last updated, Unix time as integer"""
favorited: int instance-attribute

The number of times this creator has been favorited

id: str instance-attribute

The ID of the creator

indexed: datetime instance-attribute

Timestamp when the creator was indexed, Unix time as integer

name: str instance-attribute

The name of the creator

service: str instance-attribute

The service for the creator

updated: datetime instance-attribute

Timestamp when the creator was last updated, Unix time as integer

post

__all__ = ['File', 'Attachment', 'Post'] module-attribute
Attachment

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
16
17
18
class Attachment(BaseModel):
    name: Optional[str] = None
    path: Optional[str] = None
name: Optional[str] = None class-attribute instance-attribute
path: Optional[str] = None class-attribute instance-attribute
File

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
11
12
13
class File(BaseModel):
    name: Optional[str] = None
    path: Optional[str] = None
name: Optional[str] = None class-attribute instance-attribute
path: Optional[str] = None class-attribute instance-attribute
Post

Bases: BaseModel

Source code in ktoolbox/api/model/post.py
21
22
23
24
25
26
27
28
29
30
31
32
33
class Post(BaseModel):
    id: Optional[str] = None
    user: Optional[str] = None
    service: Optional[str] = None
    title: Optional[str] = None
    content: Optional[str] = None
    embed: Optional[Dict[str, Any]] = None
    shared_file: Optional[bool] = None
    added: Optional[datetime] = None
    published: Optional[datetime] = None
    edited: Optional[datetime] = None
    file: Optional[File] = None
    attachments: Optional[List[Attachment]] = None
added: Optional[datetime] = None class-attribute instance-attribute
attachments: Optional[List[Attachment]] = None class-attribute instance-attribute
content: Optional[str] = None class-attribute instance-attribute
edited: Optional[datetime] = None class-attribute instance-attribute
embed: Optional[Dict[str, Any]] = None class-attribute instance-attribute
file: Optional[File] = None class-attribute instance-attribute
id: Optional[str] = None class-attribute instance-attribute
published: Optional[datetime] = None class-attribute instance-attribute
service: Optional[str] = None class-attribute instance-attribute
shared_file: Optional[bool] = None class-attribute instance-attribute
title: Optional[str] = None class-attribute instance-attribute
user: Optional[str] = None class-attribute instance-attribute

posts

get_announcement = GetAnnouncement.__call__ module-attribute

get_creator_post = GetCreatorPost.__call__ module-attribute

get_creators = GetCreators.__call__ module-attribute

get_post = GetPost.__call__ module-attribute

GetAnnouncement

Bases: BaseAPI

Source code in ktoolbox/api/posts/get_announcement.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class GetAnnouncement(BaseAPI):
    path = "/{service}/user/{creator_id}/announcements"
    method = "get"

    class Response(RootModel[List[Announcement]]):
        root: List[Announcement]

    @classmethod
    async def __call__(cls, service: str, creator_id: str) -> APIRet[List[Announcement]]:
        """
        Get creator announcements

        :param service: The service name
        :param creator_id: The creator's ID
        """
        return await cls.request(path=cls.path.format(service=service, creator_id=creator_id))
method = 'get' class-attribute instance-attribute
path = '/{service}/user/{creator_id}/announcements' class-attribute instance-attribute
Response

Bases: RootModel[List[Announcement]]

Source code in ktoolbox/api/posts/get_announcement.py
15
16
class Response(RootModel[List[Announcement]]):
    root: List[Announcement]
root: List[Announcement] instance-attribute
__call__(service, creator_id) async classmethod

Get creator announcements

Parameters:

Name Type Description Default
service str

The service name

required
creator_id str

The creator's ID

required
Source code in ktoolbox/api/posts/get_announcement.py
18
19
20
21
22
23
24
25
26
@classmethod
async def __call__(cls, service: str, creator_id: str) -> APIRet[List[Announcement]]:
    """
    Get creator announcements

    :param service: The service name
    :param creator_id: The creator's ID
    """
    return await cls.request(path=cls.path.format(service=service, creator_id=creator_id))

GetCreatorPost

Bases: BaseAPI

Source code in ktoolbox/api/posts/get_creator_post.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class GetCreatorPost(BaseAPI):
    path = "/{service}/user/{creator_id}"
    method = "get"

    class Response(RootModel[List[Post]]):
        root: List[Post]

    @classmethod
    async def __call__(cls, service: str, creator_id: str, *, q: str = None, o: int = None) -> APIRet[List[Post]]:
        """
        Get a list of creator posts

        :param service: The service where the post is located
        :param creator_id: The ID of the creator
        :param q: Search query
        :param o: Result offset, stepping of 50 is enforced
        """
        return await cls.request(
            path=cls.path.format(service=service, creator_id=creator_id),
            params={
                "q": q,
                "o": o
            }
        )
method = 'get' class-attribute instance-attribute
path = '/{service}/user/{creator_id}' class-attribute instance-attribute
Response

Bases: RootModel[List[Post]]

Source code in ktoolbox/api/posts/get_creator_post.py
15
16
class Response(RootModel[List[Post]]):
    root: List[Post]
root: List[Post] instance-attribute
__call__(service, creator_id, *, q=None, o=None) async classmethod

Get a list of creator posts

Parameters:

Name Type Description Default
service str

The service where the post is located

required
creator_id str

The ID of the creator

required
q str

Search query

None
o int

Result offset, stepping of 50 is enforced

None
Source code in ktoolbox/api/posts/get_creator_post.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@classmethod
async def __call__(cls, service: str, creator_id: str, *, q: str = None, o: int = None) -> APIRet[List[Post]]:
    """
    Get a list of creator posts

    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param q: Search query
    :param o: Result offset, stepping of 50 is enforced
    """
    return await cls.request(
        path=cls.path.format(service=service, creator_id=creator_id),
        params={
            "q": q,
            "o": o
        }
    )

GetCreators

Bases: BaseAPI

List All Creators

Source code in ktoolbox/api/posts/get_creators.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class GetCreators(BaseAPI):
    """List All Creators"""
    path = "/creators.txt"
    method = "get"

    class Response(RootModel[List[Creator]]):
        root: List[Creator]

    @classmethod
    async def __call__(cls) -> APIRet[List[Creator]]:
        """
        List of all creators

        List all creators with details. I blame DDG for .txt.
        """
        return await cls.request()
method = 'get' class-attribute instance-attribute
path = '/creators.txt' class-attribute instance-attribute
Response

Bases: RootModel[List[Creator]]

Source code in ktoolbox/api/posts/get_creators.py
16
17
class Response(RootModel[List[Creator]]):
    root: List[Creator]
root: List[Creator] instance-attribute
__call__() async classmethod

List of all creators

List all creators with details. I blame DDG for .txt.

Source code in ktoolbox/api/posts/get_creators.py
19
20
21
22
23
24
25
26
@classmethod
async def __call__(cls) -> APIRet[List[Creator]]:
    """
    List of all creators

    List all creators with details. I blame DDG for .txt.
    """
    return await cls.request()

GetPost

Bases: BaseAPI

Source code in ktoolbox/api/posts/get_post.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class GetPost(BaseAPI):
    path = "/{service}/user/{creator_id}/post/{post_id}"
    method = "get"

    class Response(BaseModel):
        post: Post

    @classmethod
    async def __call__(cls, service: str, creator_id: str, post_id: str) -> APIRet[Response]:
        """
        Get a specific post

        :param service: The service name
        :param creator_id: The creator's ID
        :param post_id: The post ID
        """
        return await cls.request(
            path=cls.path.format(
                service=service,
                creator_id=creator_id,
                post_id=post_id
            )
        )
method = 'get' class-attribute instance-attribute
path = '/{service}/user/{creator_id}/post/{post_id}' class-attribute instance-attribute
Response

Bases: BaseModel

Source code in ktoolbox/api/posts/get_post.py
13
14
class Response(BaseModel):
    post: Post
post: Post instance-attribute
__call__(service, creator_id, post_id) async classmethod

Get a specific post

Parameters:

Name Type Description Default
service str

The service name

required
creator_id str

The creator's ID

required
post_id str

The post ID

required
Source code in ktoolbox/api/posts/get_post.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@classmethod
async def __call__(cls, service: str, creator_id: str, post_id: str) -> APIRet[Response]:
    """
    Get a specific post

    :param service: The service name
    :param creator_id: The creator's ID
    :param post_id: The post ID
    """
    return await cls.request(
        path=cls.path.format(
            service=service,
            creator_id=creator_id,
            post_id=post_id
        )
    )

utils

SEARCH_STEP = 50 module-attribute

Searching APIs result steps

__all__ = ['SEARCH_STEP', 'get_creator_icon', 'get_creator_banner'] module-attribute

get_creator_banner(creator_id, service)

Get the creator banner for a given creator ID and service.

Returns:

Type Description
str

The banner URL.

Source code in ktoolbox/api/utils.py
21
22
23
24
25
26
27
28
def get_creator_banner(creator_id: str, service: str) -> str:
    """
    Get the creator banner for a given creator ID and service.

    :return: The banner URL.
    """
    url_parts = [config.api.scheme, config.api.statics_netloc, f"/banners/{service}/{creator_id}", '', '', '']
    return str(urlunparse(url_parts))

get_creator_icon(creator_id, service)

Get the creator icon for a given creator ID and service.

Returns:

Type Description
str

The icon URL.

Source code in ktoolbox/api/utils.py
11
12
13
14
15
16
17
18
def get_creator_icon(creator_id: str, service: str) -> str:
    """
    Get the creator icon for a given creator ID and service.

    :return: The icon URL.
    """
    url_parts = [config.api.scheme, config.api.statics_netloc, f"/icons/{service}/{creator_id}", '', '', '']
    return str(urlunparse(url_parts))

cli

__all__ = ['KToolBoxCli'] module-attribute

KToolBoxCli

Source code in ktoolbox/cli.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class KToolBoxCli:
    @staticmethod
    async def version():
        """Show KToolBox version"""
        return __version__

    @staticmethod
    async def site_version():
        # noinspection SpellCheckingInspection
        """Show current Kemono site app commit hash"""
        logger.info(repr(config))
        ret = await get_app_version()
        return ret.data if ret else ret.message

    @staticmethod
    async def config_editor():
        """Launch graphical KToolBox configuration editor"""
        try:
            from ktoolbox.editor import run_config_editor
            run_config_editor()
        except ModuleNotFoundError:
            logger.error(
                "You need to install extra dependencies to use the editor, "
                "run `pip install ktoolbox[urwid]` "
                "or `pipx install ktoolbox[urwid] --force` if you are using pipx"
            )

    @staticmethod
    async def example_env():
        """Generate an example configuration ``.env`` file."""
        print(
            render(
                OutputFormat.DOTENV,
                class_path=("ktoolbox.configuration.Configuration",)
            )
        )

    # noinspection PyShadowingBuiltins
    @staticmethod
    async def search_creator(
            name: str = None,
            id: str = None,
            service: str = None,
            *,
            dump: Path = None
    ):
        """
        Search creator, you can use multiple parameters as keywords.

        :param id: The ID of the creator
        :param name: The name of the creator
        :param service: The service for the creator
        :param dump: Dump the result to a JSON file
        """
        logger.info(repr(config))
        ret = await search_creator_action(id=id, name=name, service=service)
        if ret:
            result_list = list(ret.data)
            if dump:
                await dump_search(result_list, dump)
            return result_list or TextEnum.SearchResultEmpty.value
        else:
            return ret.message

    # noinspection PyShadowingBuiltins
    @staticmethod
    async def search_creator_post(
            id: str = None,
            name: str = None,
            service: str = None,
            q: str = None,
            o: int = None,
            *,
            dump: Path = None
    ):
        """
        Search posts from creator, you can use multiple parameters as keywords.

        :param id: The ID of the creator
        :param name: The name of the creator
        :param service: The service for the creator
        :param q: Search query
        :param o: Result offset, stepping of 50 is enforced
        :param dump: Dump the result to a JSON file
        """
        logger.info(repr(config))
        ret = await search_creator_post_action(id=id, name=name, service=service, q=q, o=o)
        if ret:
            if dump:
                await dump_search(ret.data, dump)
            return ret.data or TextEnum.SearchResultEmpty.value
        else:
            return ret.message

    @staticmethod
    async def get_post(service: str, creator_id: str, post_id: str, *, dump: Path = None):
        """
        Get a specific post

        :param service: The service name
        :param creator_id: The creator's ID
        :param post_id: The post ID
        :param dump: Dump the result to a JSON file
        """
        logger.info(repr(config))
        ret = await get_post_api(
            service=service,
            creator_id=creator_id,
            post_id=post_id
        )
        if ret:
            if dump:
                async with aiofiles.open(str(dump), "w", encoding="utf-8") as f:
                    await f.write(
                        ret.data.post.model_dump_json(indent=config.json_dump_indent)
                    )
            return ret.data.post
        else:
            return ret.message

    @staticmethod
    @overload
    async def download_post(
            url: str,
            path: Union[Path, str] = Path("."),
            *,
            dump_post_data=True
    ):
        ...

    @staticmethod
    @overload
    async def download_post(
            service: str,
            creator_id: str,
            post_id: str,
            path: Union[Path, str] = Path("."),
            *,
            dump_post_data=True
    ):
        ...

    @staticmethod
    async def download_post(
            url: str = None,
            service: str = None,
            creator_id: str = None,
            post_id: str = None,
            path: Union[Path, str] = Path("."),
            *,
            dump_post_data=True
    ):
        """
        Download a specific post

        :param url: The post URL
        :param service: The service name
        :param creator_id: The creator's ID
        :param post_id: The post ID
        :param path: Download path, default is current directory
        :param dump_post_data: Whether to dump post data (post.json) in post directory
        """
        logger.info(repr(config))
        # Get service, creator_id, post_id
        if url:
            service, creator_id, post_id = parse_webpage_url(url)
        if not all([service, creator_id, post_id]):
            return generate_msg(
                TextEnum.MissingParams.value,
                use_at_lease_one=[
                    ["url"],
                    ["service", "creator_id", "post_id"]
                ])

        path = path if isinstance(path, Path) else Path(path)
        ret = await get_post_api(
            service=service,
            creator_id=creator_id,
            post_id=post_id
        )
        if ret:
            post_path = path / generate_post_path_name(ret.data.post)
            job_list = await create_job_from_post(
                post=ret.data.post,
                post_path=post_path,
                dump_post_data=dump_post_data
            )
            job_runner = JobRunner(job_list=job_list)
            await job_runner.start()
        else:
            return ret.message

    @staticmethod
    @overload
    async def sync_creator(
            url: str,
            path: Union[Path, str] = Path("."),
            *,
            save_creator_indices: bool = True,
            mix_posts: bool = None,
            start_time: str = None,
            end_time: str = None
    ):
        ...

    @staticmethod
    @overload
    async def sync_creator(
            service: str,
            creator_id: str,
            path: Union[Path, str] = Path("."),
            *,
            save_creator_indices: bool = True,
            mix_posts: bool = None,
            start_time: str = None,
            end_time: str = None
    ):
        ...

    @staticmethod
    async def sync_creator(
            url: str = None,
            service: str = None,
            creator_id: str = None,
            path: Union[Path, str] = Path("."),
            *,
            save_creator_indices: bool = False,
            mix_posts: bool = None,
            start_time: str = None,
            end_time: str = None,
            offset: int = 0,
            length: int = None
    ):
        """
        Sync posts from a creator

        You can update the directory anytime after download finished, \
        such as to update after creator published new posts.

        * ``start_time`` & ``end_time`` example: ``2023-12-7``, ``2023-12-07``

        :param url: The post URL
        :param service: The service where the post is located
        :param creator_id: The ID of the creator
        :param path: Download path, default is current directory
        :param save_creator_indices: Record ``CreatorIndices`` data
        :param mix_posts: Save all_pages files from different posts at same path, \
            ``save_creator_indices`` will be ignored if enabled
        :param start_time: Start time of the published time range for posts downloading. \
            Set to ``0`` if ``None`` was given. \
            Time format: ``%Y-%m-%d``
        :param end_time: End time of the published time range for posts downloading. \
            Set to latest time (infinity) if ``None`` was given. \
            Time format: ``%Y-%m-%d``
        :param offset: Result offset (or start offset)
        :param length: The number of posts to fetch, defaults to fetching all posts after ``offset``.
        """
        logger.info(repr(config))
        # Get service, creator_id
        if url:
            service, creator_id, _ = parse_webpage_url(url)
        if not all([service, creator_id]):
            return generate_msg(
                TextEnum.MissingParams.value,
                use_at_lease_one=[
                    ["url"],
                    ["service", "creator_id"]
                ])

        path = path if isinstance(path, Path) else Path(path)

        # Get creator name
        creator_name = creator_id
        creator_ret = await search_creator_action(id=creator_id, service=service)
        if creator_ret:
            creator = next(creator_ret.data, None)
            if creator:
                creator_name = creator.name
                logger.info(
                    generate_msg(
                        "Got creator information",
                        name=creator.name,
                        id=creator.id
                    )
                )
        else:
            logger.error(
                generate_msg(
                    f"Failed to fetch the name of creator <{creator_id}>",
                    detail=creator_ret.message
                )
            )
            return creator_ret.message

        creator_path = path / sanitize_filename(creator_name)

        creator_path.mkdir(exist_ok=True)
        ret = await create_job_from_creator(
            service=service,
            creator_id=creator_id,
            path=creator_path,
            all_pages=not length,
            offset=offset,
            length=length,
            save_creator_indices=save_creator_indices,
            mix_posts=mix_posts,
            start_time=datetime.strptime(start_time, "%Y-%m-%d") if start_time else None,
            end_time=datetime.strptime(end_time, "%Y-%m-%d") if end_time else None
        )
        if ret:
            job_runner = JobRunner(job_list=ret.data)
            await job_runner.start()
        else:
            return ret.message

config_editor() async staticmethod

Launch graphical KToolBox configuration editor

Source code in ktoolbox/cli.py
37
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
async def config_editor():
    """Launch graphical KToolBox configuration editor"""
    try:
        from ktoolbox.editor import run_config_editor
        run_config_editor()
    except ModuleNotFoundError:
        logger.error(
            "You need to install extra dependencies to use the editor, "
            "run `pip install ktoolbox[urwid]` "
            "or `pipx install ktoolbox[urwid] --force` if you are using pipx"
        )

download_post(url=None, service=None, creator_id=None, post_id=None, path=Path('.'), *, dump_post_data=True) async staticmethod

Download a specific post

Parameters:

Name Type Description Default
url str

The post URL

None
service str

The service name

None
creator_id str

The creator's ID

None
post_id str

The post ID

None
path Union[Path, str]

Download path, default is current directory

Path('.')
dump_post_data

Whether to dump post data (post.json) in post directory

True
Source code in ktoolbox/cli.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@staticmethod
async def download_post(
        url: str = None,
        service: str = None,
        creator_id: str = None,
        post_id: str = None,
        path: Union[Path, str] = Path("."),
        *,
        dump_post_data=True
):
    """
    Download a specific post

    :param url: The post URL
    :param service: The service name
    :param creator_id: The creator's ID
    :param post_id: The post ID
    :param path: Download path, default is current directory
    :param dump_post_data: Whether to dump post data (post.json) in post directory
    """
    logger.info(repr(config))
    # Get service, creator_id, post_id
    if url:
        service, creator_id, post_id = parse_webpage_url(url)
    if not all([service, creator_id, post_id]):
        return generate_msg(
            TextEnum.MissingParams.value,
            use_at_lease_one=[
                ["url"],
                ["service", "creator_id", "post_id"]
            ])

    path = path if isinstance(path, Path) else Path(path)
    ret = await get_post_api(
        service=service,
        creator_id=creator_id,
        post_id=post_id
    )
    if ret:
        post_path = path / generate_post_path_name(ret.data.post)
        job_list = await create_job_from_post(
            post=ret.data.post,
            post_path=post_path,
            dump_post_data=dump_post_data
        )
        job_runner = JobRunner(job_list=job_list)
        await job_runner.start()
    else:
        return ret.message

example_env() async staticmethod

Generate an example configuration .env file.

Source code in ktoolbox/cli.py
50
51
52
53
54
55
56
57
58
@staticmethod
async def example_env():
    """Generate an example configuration ``.env`` file."""
    print(
        render(
            OutputFormat.DOTENV,
            class_path=("ktoolbox.configuration.Configuration",)
        )
    )

get_post(service, creator_id, post_id, *, dump=None) async staticmethod

Get a specific post

Parameters:

Name Type Description Default
service str

The service name

required
creator_id str

The creator's ID

required
post_id str

The post ID

required
dump Path

Dump the result to a JSON file

None
Source code in ktoolbox/cli.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@staticmethod
async def get_post(service: str, creator_id: str, post_id: str, *, dump: Path = None):
    """
    Get a specific post

    :param service: The service name
    :param creator_id: The creator's ID
    :param post_id: The post ID
    :param dump: Dump the result to a JSON file
    """
    logger.info(repr(config))
    ret = await get_post_api(
        service=service,
        creator_id=creator_id,
        post_id=post_id
    )
    if ret:
        if dump:
            async with aiofiles.open(str(dump), "w", encoding="utf-8") as f:
                await f.write(
                    ret.data.post.model_dump_json(indent=config.json_dump_indent)
                )
        return ret.data.post
    else:
        return ret.message

search_creator(name=None, id=None, service=None, *, dump=None) async staticmethod

Search creator, you can use multiple parameters as keywords.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
dump Path

Dump the result to a JSON file

None
Source code in ktoolbox/cli.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
async def search_creator(
        name: str = None,
        id: str = None,
        service: str = None,
        *,
        dump: Path = None
):
    """
    Search creator, you can use multiple parameters as keywords.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    :param dump: Dump the result to a JSON file
    """
    logger.info(repr(config))
    ret = await search_creator_action(id=id, name=name, service=service)
    if ret:
        result_list = list(ret.data)
        if dump:
            await dump_search(result_list, dump)
        return result_list or TextEnum.SearchResultEmpty.value
    else:
        return ret.message

search_creator_post(id=None, name=None, service=None, q=None, o=None, *, dump=None) async staticmethod

Search posts from creator, you can use multiple parameters as keywords.

Parameters:

Name Type Description Default
id str

The ID of the creator

None
name str

The name of the creator

None
service str

The service for the creator

None
q str

Search query

None
o int

Result offset, stepping of 50 is enforced

None
dump Path

Dump the result to a JSON file

None
Source code in ktoolbox/cli.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@staticmethod
async def search_creator_post(
        id: str = None,
        name: str = None,
        service: str = None,
        q: str = None,
        o: int = None,
        *,
        dump: Path = None
):
    """
    Search posts from creator, you can use multiple parameters as keywords.

    :param id: The ID of the creator
    :param name: The name of the creator
    :param service: The service for the creator
    :param q: Search query
    :param o: Result offset, stepping of 50 is enforced
    :param dump: Dump the result to a JSON file
    """
    logger.info(repr(config))
    ret = await search_creator_post_action(id=id, name=name, service=service, q=q, o=o)
    if ret:
        if dump:
            await dump_search(ret.data, dump)
        return ret.data or TextEnum.SearchResultEmpty.value
    else:
        return ret.message

site_version() async staticmethod

Show current Kemono site app commit hash

Source code in ktoolbox/cli.py
29
30
31
32
33
34
35
@staticmethod
async def site_version():
    # noinspection SpellCheckingInspection
    """Show current Kemono site app commit hash"""
    logger.info(repr(config))
    ret = await get_app_version()
    return ret.data if ret else ret.message

sync_creator(url=None, service=None, creator_id=None, path=Path('.'), *, save_creator_indices=False, mix_posts=None, start_time=None, end_time=None, offset=0, length=None) async staticmethod

Sync posts from a creator

You can update the directory anytime after download finished, such as to update after creator published new posts.

  • start_time & end_time example: 2023-12-7, 2023-12-07

Parameters:

Name Type Description Default
url str

The post URL

None
service str

The service where the post is located

None
creator_id str

The ID of the creator

None
path Union[Path, str]

Download path, default is current directory

Path('.')
save_creator_indices bool

Record CreatorIndices data

False
mix_posts bool

Save all_pages files from different posts at same path, save_creator_indices will be ignored if enabled

None
start_time str

Start time of the published time range for posts downloading. Set to 0 if None was given. Time format: %Y-%m-%d

None
end_time str

End time of the published time range for posts downloading. Set to latest time (infinity) if None was given. Time format: %Y-%m-%d

None
offset int

Result offset (or start offset)

0
length int

The number of posts to fetch, defaults to fetching all posts after offset.

None
Source code in ktoolbox/cli.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
@staticmethod
async def sync_creator(
        url: str = None,
        service: str = None,
        creator_id: str = None,
        path: Union[Path, str] = Path("."),
        *,
        save_creator_indices: bool = False,
        mix_posts: bool = None,
        start_time: str = None,
        end_time: str = None,
        offset: int = 0,
        length: int = None
):
    """
    Sync posts from a creator

    You can update the directory anytime after download finished, \
    such as to update after creator published new posts.

    * ``start_time`` & ``end_time`` example: ``2023-12-7``, ``2023-12-07``

    :param url: The post URL
    :param service: The service where the post is located
    :param creator_id: The ID of the creator
    :param path: Download path, default is current directory
    :param save_creator_indices: Record ``CreatorIndices`` data
    :param mix_posts: Save all_pages files from different posts at same path, \
        ``save_creator_indices`` will be ignored if enabled
    :param start_time: Start time of the published time range for posts downloading. \
        Set to ``0`` if ``None`` was given. \
        Time format: ``%Y-%m-%d``
    :param end_time: End time of the published time range for posts downloading. \
        Set to latest time (infinity) if ``None`` was given. \
        Time format: ``%Y-%m-%d``
    :param offset: Result offset (or start offset)
    :param length: The number of posts to fetch, defaults to fetching all posts after ``offset``.
    """
    logger.info(repr(config))
    # Get service, creator_id
    if url:
        service, creator_id, _ = parse_webpage_url(url)
    if not all([service, creator_id]):
        return generate_msg(
            TextEnum.MissingParams.value,
            use_at_lease_one=[
                ["url"],
                ["service", "creator_id"]
            ])

    path = path if isinstance(path, Path) else Path(path)

    # Get creator name
    creator_name = creator_id
    creator_ret = await search_creator_action(id=creator_id, service=service)
    if creator_ret:
        creator = next(creator_ret.data, None)
        if creator:
            creator_name = creator.name
            logger.info(
                generate_msg(
                    "Got creator information",
                    name=creator.name,
                    id=creator.id
                )
            )
    else:
        logger.error(
            generate_msg(
                f"Failed to fetch the name of creator <{creator_id}>",
                detail=creator_ret.message
            )
        )
        return creator_ret.message

    creator_path = path / sanitize_filename(creator_name)

    creator_path.mkdir(exist_ok=True)
    ret = await create_job_from_creator(
        service=service,
        creator_id=creator_id,
        path=creator_path,
        all_pages=not length,
        offset=offset,
        length=length,
        save_creator_indices=save_creator_indices,
        mix_posts=mix_posts,
        start_time=datetime.strptime(start_time, "%Y-%m-%d") if start_time else None,
        end_time=datetime.strptime(end_time, "%Y-%m-%d") if end_time else None
    )
    if ret:
        job_runner = JobRunner(job_list=ret.data)
        await job_runner.start()
    else:
        return ret.message

version() async staticmethod

Show KToolBox version

Source code in ktoolbox/cli.py
24
25
26
27
@staticmethod
async def version():
    """Show KToolBox version"""
    return __version__

configuration

__all__ = ['config', 'APIConfiguration', 'DownloaderConfiguration', 'PostStructureConfiguration', 'JobConfiguration', 'LoggerConfiguration', 'Configuration'] module-attribute

config = Configuration(_env_file=['.env', 'prod.env']) module-attribute

APIConfiguration

Bases: BaseModel

Kemono API Configuration

Attributes:

Name Type Description
scheme Literal['http', 'https']

Kemono API URL scheme

netloc str

Kemono API URL netloc

statics_netloc str

URL netloc of Kemono server for static files (e.g. images)

files_netloc str

URL netloc of Kemono server for post files

path str

Kemono API URL root path

timeout float

API request timeout

retry_times int

API request retry times (when request failed)

retry_interval float

Seconds of API request retry interval

Source code in ktoolbox/configuration.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class APIConfiguration(BaseModel):
    """
    Kemono API Configuration

    :ivar scheme: Kemono API URL scheme
    :ivar netloc: Kemono API URL netloc
    :ivar statics_netloc: URL netloc of Kemono server for static files (e.g. images)
    :ivar files_netloc: URL netloc of Kemono server for post files
    :ivar path: Kemono API URL root path
    :ivar timeout: API request timeout
    :ivar retry_times: API request retry times (when request failed)
    :ivar retry_interval: Seconds of API request retry interval
    """
    scheme: Literal["http", "https"] = "https"
    netloc: str = "kemono.su"
    statics_netloc: str = "img.kemono.su"
    files_netloc: str = "kemono.su"
    path: str = "/api/v1"
    timeout: float = 5.0
    retry_times: int = 3
    retry_interval: float = 2.0

files_netloc: str = 'kemono.su' class-attribute instance-attribute

netloc: str = 'kemono.su' class-attribute instance-attribute

path: str = '/api/v1' class-attribute instance-attribute

retry_interval: float = 2.0 class-attribute instance-attribute

retry_times: int = 3 class-attribute instance-attribute

scheme: Literal['http', 'https'] = 'https' class-attribute instance-attribute

statics_netloc: str = 'img.kemono.su' class-attribute instance-attribute

timeout: float = 5.0 class-attribute instance-attribute

Configuration

Bases: BaseSettings

KToolBox Configuration

Attributes:

Name Type Description
api APIConfiguration

Kemono API Configuration

downloader DownloaderConfiguration

File Downloader Configuration

job JobConfiguration

Download jobs Configuration

logger LoggerConfiguration

Logger configuration

ssl_verify bool

Enable SSL certificate verification for Kemono API server and download server

json_dump_indent int

Indent of JSON file dump

use_uvloop bool

Use uvloop for asyncio (Disabled on Windows by default) uvloop will improve concurrent performance, but it is not compatible with Windows. Install uvloop by pip install ktoolbox[uvloop] or it will not work.

Source code in ktoolbox/configuration.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class Configuration(BaseSettings):
    # noinspection SpellCheckingInspection,GrazieInspection
    """
    KToolBox Configuration

    :ivar api: Kemono API Configuration
    :ivar downloader: File Downloader Configuration
    :ivar job: Download jobs Configuration
    :ivar logger: Logger configuration
    :ivar ssl_verify: Enable SSL certificate verification for Kemono API server and download server
    :ivar json_dump_indent: Indent of JSON file dump
    :ivar use_uvloop: Use uvloop for asyncio (Disabled on Windows by default) \
    uvloop will improve concurrent performance, but it is not compatible with Windows. \
    Install uvloop by `pip install ktoolbox[uvloop]` or it will not work.
    """
    api: APIConfiguration = APIConfiguration()
    downloader: DownloaderConfiguration = DownloaderConfiguration()
    job: JobConfiguration = JobConfiguration()
    logger: LoggerConfiguration = LoggerConfiguration()

    ssl_verify: bool = True
    json_dump_indent: int = 4
    use_uvloop: bool = True

    # noinspection SpellCheckingInspection
    model_config = SettingsConfigDict(
        env_prefix='ktoolbox_',
        env_nested_delimiter='__',
        env_file='.env',
        env_file_encoding='utf-8',
        extra='ignore'
    )

api: APIConfiguration = APIConfiguration() class-attribute instance-attribute

downloader: DownloaderConfiguration = DownloaderConfiguration() class-attribute instance-attribute

job: JobConfiguration = JobConfiguration() class-attribute instance-attribute

json_dump_indent: int = 4 class-attribute instance-attribute

logger: LoggerConfiguration = LoggerConfiguration() class-attribute instance-attribute

model_config = SettingsConfigDict(env_prefix='ktoolbox_', env_nested_delimiter='__', env_file='.env', env_file_encoding='utf-8', extra='ignore') class-attribute instance-attribute

ssl_verify: bool = True class-attribute instance-attribute

use_uvloop: bool = True class-attribute instance-attribute

DownloaderConfiguration

Bases: BaseModel

File Downloader Configuration

Attributes:

Name Type Description
scheme Literal['http', 'https']

Downloader URL scheme

timeout float

Downloader request timeout

encoding str

Charset for filename parsing and post content text saving

buffer_size int

Number of bytes of file I/O buffer for each downloading file

chunk_size int

Number of bytes of chunk of downloader stream

temp_suffix str

Temp filename suffix of downloading files

retry_times int

Downloader retry times (when download failed)

retry_stop_never bool

Never stop downloader from retrying (when download failed) (retry_times will be ignored when enabled)

retry_interval float

Seconds of downloader retry interval

use_bucket bool

Enable local storage bucket mode

bucket_path Path

Path of local storage bucket

Source code in ktoolbox/configuration.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class DownloaderConfiguration(BaseModel):
    """
    File Downloader Configuration

    :ivar scheme: Downloader URL scheme
    :ivar timeout: Downloader request timeout
    :ivar encoding: Charset for filename parsing and post content text saving
    :ivar buffer_size: Number of bytes of file I/O buffer for each downloading file
    :ivar chunk_size: Number of bytes of chunk of downloader stream
    :ivar temp_suffix: Temp filename suffix of downloading files
    :ivar retry_times: Downloader retry times (when download failed)
    :ivar retry_stop_never: Never stop downloader from retrying (when download failed) \
    (``retry_times`` will be ignored when enabled)
    :ivar retry_interval: Seconds of downloader retry interval
    :ivar use_bucket: Enable local storage bucket mode
    :ivar bucket_path: Path of local storage bucket
    """
    scheme: Literal["http", "https"] = "https"
    timeout: float = 30.0
    encoding: str = "utf-8"
    buffer_size: int = 20480
    chunk_size: int = 1024
    temp_suffix: str = "tmp"
    retry_times: int = 10
    retry_stop_never: bool = False
    retry_interval: float = 3.0
    use_bucket: bool = False
    bucket_path: Path = Path("./.ktoolbox/bucket_storage")

    @model_validator(mode="after")
    def check_bucket_path(self) -> "DownloaderConfiguration":
        if self.use_bucket:
            # noinspection PyBroadException
            try:
                bucket_path = Path(self.bucket_path)
                bucket_path.mkdir(parents=True, exist_ok=True)
                with tempfile.TemporaryFile(dir=bucket_path) as temp_file:
                    temp_link_file_path = f"{bucket_path / temp_file.name}.hlink"
                    os.link(temp_file.name, temp_link_file_path)
                    os.remove(temp_link_file_path)
            except Exception:
                self.use_bucket = False
                logger.exception(f"`DownloaderConfiguration.bucket_path` is not available, "
                                 f"`DownloaderConfiguration.use_bucket` has been disabled.")
        return self

bucket_path: Path = Path('./.ktoolbox/bucket_storage') class-attribute instance-attribute

buffer_size: int = 20480 class-attribute instance-attribute

chunk_size: int = 1024 class-attribute instance-attribute

encoding: str = 'utf-8' class-attribute instance-attribute

retry_interval: float = 3.0 class-attribute instance-attribute

retry_stop_never: bool = False class-attribute instance-attribute

retry_times: int = 10 class-attribute instance-attribute

scheme: Literal['http', 'https'] = 'https' class-attribute instance-attribute

temp_suffix: str = 'tmp' class-attribute instance-attribute

timeout: float = 30.0 class-attribute instance-attribute

use_bucket: bool = False class-attribute instance-attribute

check_bucket_path()

Source code in ktoolbox/configuration.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@model_validator(mode="after")
def check_bucket_path(self) -> "DownloaderConfiguration":
    if self.use_bucket:
        # noinspection PyBroadException
        try:
            bucket_path = Path(self.bucket_path)
            bucket_path.mkdir(parents=True, exist_ok=True)
            with tempfile.TemporaryFile(dir=bucket_path) as temp_file:
                temp_link_file_path = f"{bucket_path / temp_file.name}.hlink"
                os.link(temp_file.name, temp_link_file_path)
                os.remove(temp_link_file_path)
        except Exception:
            self.use_bucket = False
            logger.exception(f"`DownloaderConfiguration.bucket_path` is not available, "
                             f"`DownloaderConfiguration.use_bucket` has been disabled.")
    return self

JobConfiguration

Bases: BaseModel

Download jobs Configuration

  • Available properties for post_dirname_format

    Property Type
    id String
    user String
    service String
    title String
    added Date
    published Date
    edited Date

Attributes:

Name Type Description
count int

Number of coroutines for concurrent download

post_dirname_format str

Customize the post directory name format, you can use some of the properties in Post. e.g. [{published}]{id} > [2024-1-1]123123, {user}_{published}_{title} > 234234_2024-1-1_HelloWorld

post_structure PostStructureConfiguration

Post path structure

mix_posts bool

Save all files from different posts at same path in creator directory. It would not create any post directory, and CreatorIndices would not been recorded.

sequential_filename bool

Rename attachments in numerical order, e.g. 1.png, 2.png, ...

filename_format str

Customize the filename format by inserting an empty {} to represent the basic filename. Similar to post_dirname_format, you can use some of the properties in Post. For example: {title}_{} could result in filenames like HelloWorld_b4b41de2-8736-480d-b5c3-ebf0d917561b, HelloWorld_af349b25-ac08-46d7-98fb-6ce99a237b90, etc. You can also use it with sequential_filename. For instance, [{published}]_{} could result in filenames like [2024-1-1]_1.png, [2024-1-1]_2.png, etc.

allow_list Set[str]

Download files which match these patterns (Unix shell-style), e.g. ["*.png"]

block_list Set[str]

Not to download files which match these patterns (Unix shell-style), e.g. ["*.psd","*.zip"]

Source code in ktoolbox/configuration.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class JobConfiguration(BaseModel):
    """
    Download jobs Configuration

    - Available properties for ``post_dirname_format``

        | Property      | Type   |
        |---------------|--------|
        | ``id``        | String |
        | ``user``      | String |
        | ``service``   | String |
        | ``title``     | String |
        | ``added``     | Date   |
        | ``published`` | Date   |
        | ``edited``    | Date   |

    :ivar count: Number of coroutines for concurrent download
    :ivar post_dirname_format: Customize the post directory name format, you can use some of the \
    [properties][ktoolbox.configuration.JobConfiguration] in ``Post``. \
    e.g. ``[{published}]{id}`` > ``[2024-1-1]123123``, ``{user}_{published}_{title}`` > ``234234_2024-1-1_HelloWorld``
    :ivar post_structure: Post path structure
    :ivar mix_posts: Save all files from different posts at same path in creator directory. \
    It would not create any post directory, and ``CreatorIndices`` would not been recorded.
    :ivar sequential_filename: Rename attachments in numerical order, e.g. ``1.png``, ``2.png``, ...
    :ivar filename_format: Customize the filename format by inserting an empty ``{}`` to represent the basic filename.
    Similar to post_dirname_format, you can use some of the [properties][ktoolbox.configuration.JobConfiguration] \
    in Post. For example: ``{title}_{}`` could result in filenames like \
    ``HelloWorld_b4b41de2-8736-480d-b5c3-ebf0d917561b``, ``HelloWorld_af349b25-ac08-46d7-98fb-6ce99a237b90``, etc. \
    You can also use it with ``sequential_filename``. For instance, \
    ``[{published}]_{}`` could result in filenames like ``[2024-1-1]_1.png``, ``[2024-1-1]_2.png``, etc.
    :ivar allow_list: Download files which match these patterns (Unix shell-style), e.g. ``["*.png"]``
    :ivar block_list: Not to download files which match these patterns (Unix shell-style), e.g. ``["*.psd","*.zip"]``
    """
    count: int = 4
    post_dirname_format: str = "{title}"
    post_structure: PostStructureConfiguration = PostStructureConfiguration()
    mix_posts: bool = False
    sequential_filename: bool = False
    filename_format: str = "{}"
    allow_list: Set[str] = set()
    block_list: Set[str] = set()

allow_list: Set[str] = set() class-attribute instance-attribute

block_list: Set[str] = set() class-attribute instance-attribute

count: int = 4 class-attribute instance-attribute

filename_format: str = '{}' class-attribute instance-attribute

mix_posts: bool = False class-attribute instance-attribute

post_dirname_format: str = '{title}' class-attribute instance-attribute

post_structure: PostStructureConfiguration = PostStructureConfiguration() class-attribute instance-attribute

sequential_filename: bool = False class-attribute instance-attribute

LoggerConfiguration

Bases: BaseModel

Logger configuration

Attributes:

Name Type Description
path Optional[Path]

Path to save logs, None for disable log file output

level Union[str, int]

Log filter level

rotation Union[str, int, time, timedelta]

Log rotation

Source code in ktoolbox/configuration.py
160
161
162
163
164
165
166
167
168
169
170
class LoggerConfiguration(BaseModel):
    """
    Logger configuration

    :ivar path: Path to save logs, ``None`` for disable log file output
    :ivar level: Log filter level
    :ivar rotation: Log rotation
    """
    path: Optional[Path] = None
    level: Union[str, int] = logging.getLevelName(logging.DEBUG)
    rotation: Union[str, int, datetime.time, datetime.timedelta] = "1 week"

level: Union[str, int] = logging.getLevelName(logging.DEBUG) class-attribute instance-attribute

path: Optional[Path] = None class-attribute instance-attribute

rotation: Union[str, int, datetime.time, datetime.timedelta] = '1 week' class-attribute instance-attribute

PostStructureConfiguration

Bases: BaseModel

Post path structure model

  • Default:
    ..
    ├─ content.txt
    ├─ <Post file>
    ├─ <Post data (post.json)>
    └─ attachments
       ├─ 1.png
       └─ 2.png
    

Attributes:

Name Type Description
attachments Path

Sub path of attachment directory

content_filepath Path

Sub path of post content file

Source code in ktoolbox/configuration.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class PostStructureConfiguration(BaseModel):
    # noinspection SpellCheckingInspection
    """
    Post path structure model

    - Default:
    ```
    ..
    ├─ content.txt
    ├─ <Post file>
    ├─ <Post data (post.json)>
    └─ attachments
       ├─ 1.png
       └─ 2.png
    ```

    :ivar attachments: Sub path of attachment directory
    :ivar content_filepath: Sub path of post content file
    """
    attachments: Path = Path("attachments")
    content_filepath: Path = Path("content.txt")

attachments: Path = Path('attachments') class-attribute instance-attribute

content_filepath: Path = Path('content.txt') class-attribute instance-attribute

downloader

Downloader

Attributes:

Name Type Description
_save_filename

The actual filename for saving.

Source code in ktoolbox/downloader/downloader.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class Downloader:
    """
    :ivar _save_filename: The actual filename for saving.
    """
    client = httpx.AsyncClient(verify=config.ssl_verify)

    def __init__(
            self,
            url: str,
            path: Path,
            *,
            buffer_size: int = None,
            chunk_size: int = None,
            designated_filename: str = None,
            server_path: str = None
    ):
        # noinspection GrazieInspection
        """
        Initialize a file downloader

        - About filename:
            1. If ``designated_filename`` parameter is set, use it.
            2. Else if ``Content-Disposition`` is set in headers, use filename from it.
            3. Else use filename from 'file' part of ``server_path``.

        :param url: Download URL
        :param path: Directory path to save the file, which needs to be sanitized
        :param buffer_size: Number of bytes for file I/O buffer
        :param chunk_size: Number of bytes for chunk of download stream
        :param designated_filename: Manually specify the filename for saving, which needs to be sanitized
        :param server_path: Server path of the file. if ``DownloaderConfiguration.use_bucket`` enabled, \
        it will be used as the save path.
        """

        self._url = url
        self._path = path
        self._buffer_size = buffer_size or config.downloader.buffer_size
        self._chunk_size = chunk_size or config.downloader.chunk_size
        self._designated_filename = designated_filename
        self._server_path = server_path  # /hash[:1]/hash2[1:3]/hash
        self._save_filename = designated_filename  # Prioritize the manually specified filename

        self._lock = asyncio.Lock()
        self._stop: bool = False

    @cached_property
    def url(self) -> str:
        """Download URL"""
        return self._url

    @cached_property
    def path(self) -> Path:
        """Directory path to save the file"""
        return self._path

    @cached_property
    def buffer_size(self) -> int:
        """Number of bytes for file I/O buffer"""
        return self._buffer_size

    @cached_property
    def chunk_size(self) -> int:
        """Number of bytes for chunk of download stream"""
        return self._chunk_size

    @property
    def filename(self) -> Optional[str]:
        """Actual filename of the download file"""
        return self._save_filename

    @property
    def finished(self) -> bool:
        """
        Check if the download finished

        :return: ``False`` if the download **in process**, ``True`` otherwise
        """
        return not self._lock.locked()

    def cancel(self):
        """
        Cancel the download

        It will raise ``asyncio.CancelledError`` in ``chunk_iterator`` (writing chunk to file) iteration.
        """
        self._stop = True

    @tenacity.retry(
        stop=stop_never if config.downloader.retry_stop_never else stop_after_attempt(config.downloader.retry_times),
        wait=wait_fixed(config.downloader.retry_interval),
        retry=retry_if_result(
            lambda x: not x and x.code != RetCodeEnum.FileExisted
        ) | retry_if_exception(
            lambda x: isinstance(x, httpx.HTTPError)
        ),
        before_sleep=lambda x: logger.warning(
            generate_msg(
                f"Retrying ({x.attempt_number})",
                message=x.outcome.result().message if not x.outcome.failed else None,
                exception=x.outcome.exception()
            )
        ),
        reraise=True
    )
    async def run(
            self,
            *,
            sync_callable: Callable[["Downloader"], Any] = None,
            async_callable: Callable[["Downloader"], Coroutine] = None,
            tqdm_class: Type[std_tqdm] = None,
            progress: bool = False
    ) -> DownloaderRet[str]:
        """
        Start to download

        :param sync_callable: Sync callable for download finished
        :param async_callable: Async callable for download finished
        :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
        :param progress: Show progress bar
        :return: ``DownloaderRet`` which contain the actual output filename
        :raise CancelledError: Job cancelled
        """
        # Get filename to check if file exists (First-time duplicate file check)
        # Check it before request to make progress more efficiency
        server_relpath = self._server_path[1:]
        server_relpath_without_params = urlparse(server_relpath).path
        server_path_filename = unquote(Path(server_relpath_without_params).name)
        # Priority order can be referenced from the constructor's documentation
        save_filepath = self._path / (self._save_filename or server_path_filename)

        # Get bucket file path
        bucket_file_path: Optional[Path] = None
        if config.downloader.use_bucket:
            bucket_file_path = config.downloader.bucket_path / server_relpath

        # Check if the file exists
        file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
        if file_existed:
            return DownloaderRet(
                code=RetCodeEnum.FileExisted,
                message=generate_msg(
                    ret_msg,
                    path=save_filepath
                )
            )

        tqdm_class: Type[std_tqdm] = tqdm_class or tqdm.asyncio.tqdm
        async with self._lock:
            temp_filepath = Path(f"{save_filepath}.{config.downloader.temp_suffix}")
            temp_size = temp_filepath.stat().st_size if temp_filepath.exists() else 0

            async with self.client.stream(
                    method="GET",
                    url=self._url,
                    follow_redirects=True,
                    timeout=config.downloader.timeout,
                    headers={"Range": f"bytes={temp_size}-"}
            ) as res:  # type: httpx.Response
                if res.status_code != httpx.codes.PARTIAL_CONTENT:
                    return DownloaderRet(
                        code=RetCodeEnum.GeneralFailure,
                        message=generate_msg(
                            "Download failed",
                            status_code=res.status_code,
                            filename=save_filepath
                        )
                    )

                # Get filename for saving and check if file exists (Second-time duplicate file check)
                # Priority order can be referenced from the constructor's documentation
                self._save_filename = self._designated_filename or sanitize_filename(
                    filename_from_headers(res.headers)
                ) or server_path_filename
                save_filepath = self._path / self._save_filename
                file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
                if file_existed:
                    return DownloaderRet(
                        code=RetCodeEnum.FileExisted,
                        message=generate_msg(
                            ret_msg,
                            path=save_filepath
                        )
                    )

                # Download
                total_size = int(range_str.split("/")[-1]) if (range_str := res.headers.get("Content-Range")) else None
                async with aiofiles.open(str(temp_filepath), "ab", self._buffer_size) as f:
                    chunk_iterator = res.aiter_bytes(self._chunk_size)
                    t = tqdm_class(
                        desc=self._save_filename,
                        total=total_size,
                        initial=temp_size,
                        disable=not progress,
                        unit="B",
                        unit_scale=True
                    )
                    async for chunk in chunk_iterator:
                        if self._stop:
                            raise CancelledError
                        await f.write(chunk)
                        t.update(len(chunk))  # Update progress bar

            # Download finished
            if config.downloader.use_bucket:
                bucket_file_path.parent.mkdir(parents=True, exist_ok=True)
                os.link(temp_filepath, bucket_file_path)
            temp_filepath.rename(self._path / self._save_filename)

            # Callbacks
            if sync_callable:
                sync_callable(self)
            if async_callable:
                await async_callable(self)

            return DownloaderRet(
                data=self._save_filename
            ) if self._save_filename else DownloaderRet(
                code=RetCodeEnum.GeneralFailure,
                message=generate_msg(
                    "Download failed",
                    filename=self._designated_filename
                )
            )

    __call__ = run

__call__ = run class-attribute instance-attribute

buffer_size: int cached property

Number of bytes for file I/O buffer

chunk_size: int cached property

Number of bytes for chunk of download stream

client = httpx.AsyncClient(verify=config.ssl_verify) class-attribute instance-attribute

filename: Optional[str] property

Actual filename of the download file

finished: bool property

Check if the download finished

Returns:

Type Description
bool

False if the download in process, True otherwise

path: Path cached property

Directory path to save the file

url: str cached property

Download URL

__init__(url, path, *, buffer_size=None, chunk_size=None, designated_filename=None, server_path=None)

Initialize a file downloader

  • About filename:
    1. If designated_filename parameter is set, use it.
    2. Else if Content-Disposition is set in headers, use filename from it.
    3. Else use filename from 'file' part of server_path.

Parameters:

Name Type Description Default
url str

Download URL

required
path Path

Directory path to save the file, which needs to be sanitized

required
buffer_size int

Number of bytes for file I/O buffer

None
chunk_size int

Number of bytes for chunk of download stream

None
designated_filename str

Manually specify the filename for saving, which needs to be sanitized

None
server_path str

Server path of the file. if DownloaderConfiguration.use_bucket enabled, it will be used as the save path.

None
Source code in ktoolbox/downloader/downloader.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
        self,
        url: str,
        path: Path,
        *,
        buffer_size: int = None,
        chunk_size: int = None,
        designated_filename: str = None,
        server_path: str = None
):
    # noinspection GrazieInspection
    """
    Initialize a file downloader

    - About filename:
        1. If ``designated_filename`` parameter is set, use it.
        2. Else if ``Content-Disposition`` is set in headers, use filename from it.
        3. Else use filename from 'file' part of ``server_path``.

    :param url: Download URL
    :param path: Directory path to save the file, which needs to be sanitized
    :param buffer_size: Number of bytes for file I/O buffer
    :param chunk_size: Number of bytes for chunk of download stream
    :param designated_filename: Manually specify the filename for saving, which needs to be sanitized
    :param server_path: Server path of the file. if ``DownloaderConfiguration.use_bucket`` enabled, \
    it will be used as the save path.
    """

    self._url = url
    self._path = path
    self._buffer_size = buffer_size or config.downloader.buffer_size
    self._chunk_size = chunk_size or config.downloader.chunk_size
    self._designated_filename = designated_filename
    self._server_path = server_path  # /hash[:1]/hash2[1:3]/hash
    self._save_filename = designated_filename  # Prioritize the manually specified filename

    self._lock = asyncio.Lock()
    self._stop: bool = False

cancel()

Cancel the download

It will raise asyncio.CancelledError in chunk_iterator (writing chunk to file) iteration.

Source code in ktoolbox/downloader/downloader.py
107
108
109
110
111
112
113
def cancel(self):
    """
    Cancel the download

    It will raise ``asyncio.CancelledError`` in ``chunk_iterator`` (writing chunk to file) iteration.
    """
    self._stop = True

run(*, sync_callable=None, async_callable=None, tqdm_class=None, progress=False) async

Start to download

Parameters:

Name Type Description Default
sync_callable Callable[[Downloader], Any]

Sync callable for download finished

None
async_callable Callable[[Downloader], Coroutine]

Async callable for download finished

None
tqdm_class Type[tqdm]

tqdm class to replace default tqdm.asyncio.tqdm

None
progress bool

Show progress bar

False

Returns:

Type Description
DownloaderRet[str]

DownloaderRet which contain the actual output filename

Raises:

Type Description
CancelledError

Job cancelled

Source code in ktoolbox/downloader/downloader.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@tenacity.retry(
    stop=stop_never if config.downloader.retry_stop_never else stop_after_attempt(config.downloader.retry_times),
    wait=wait_fixed(config.downloader.retry_interval),
    retry=retry_if_result(
        lambda x: not x and x.code != RetCodeEnum.FileExisted
    ) | retry_if_exception(
        lambda x: isinstance(x, httpx.HTTPError)
    ),
    before_sleep=lambda x: logger.warning(
        generate_msg(
            f"Retrying ({x.attempt_number})",
            message=x.outcome.result().message if not x.outcome.failed else None,
            exception=x.outcome.exception()
        )
    ),
    reraise=True
)
async def run(
        self,
        *,
        sync_callable: Callable[["Downloader"], Any] = None,
        async_callable: Callable[["Downloader"], Coroutine] = None,
        tqdm_class: Type[std_tqdm] = None,
        progress: bool = False
) -> DownloaderRet[str]:
    """
    Start to download

    :param sync_callable: Sync callable for download finished
    :param async_callable: Async callable for download finished
    :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
    :param progress: Show progress bar
    :return: ``DownloaderRet`` which contain the actual output filename
    :raise CancelledError: Job cancelled
    """
    # Get filename to check if file exists (First-time duplicate file check)
    # Check it before request to make progress more efficiency
    server_relpath = self._server_path[1:]
    server_relpath_without_params = urlparse(server_relpath).path
    server_path_filename = unquote(Path(server_relpath_without_params).name)
    # Priority order can be referenced from the constructor's documentation
    save_filepath = self._path / (self._save_filename or server_path_filename)

    # Get bucket file path
    bucket_file_path: Optional[Path] = None
    if config.downloader.use_bucket:
        bucket_file_path = config.downloader.bucket_path / server_relpath

    # Check if the file exists
    file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
    if file_existed:
        return DownloaderRet(
            code=RetCodeEnum.FileExisted,
            message=generate_msg(
                ret_msg,
                path=save_filepath
            )
        )

    tqdm_class: Type[std_tqdm] = tqdm_class or tqdm.asyncio.tqdm
    async with self._lock:
        temp_filepath = Path(f"{save_filepath}.{config.downloader.temp_suffix}")
        temp_size = temp_filepath.stat().st_size if temp_filepath.exists() else 0

        async with self.client.stream(
                method="GET",
                url=self._url,
                follow_redirects=True,
                timeout=config.downloader.timeout,
                headers={"Range": f"bytes={temp_size}-"}
        ) as res:  # type: httpx.Response
            if res.status_code != httpx.codes.PARTIAL_CONTENT:
                return DownloaderRet(
                    code=RetCodeEnum.GeneralFailure,
                    message=generate_msg(
                        "Download failed",
                        status_code=res.status_code,
                        filename=save_filepath
                    )
                )

            # Get filename for saving and check if file exists (Second-time duplicate file check)
            # Priority order can be referenced from the constructor's documentation
            self._save_filename = self._designated_filename or sanitize_filename(
                filename_from_headers(res.headers)
            ) or server_path_filename
            save_filepath = self._path / self._save_filename
            file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
            if file_existed:
                return DownloaderRet(
                    code=RetCodeEnum.FileExisted,
                    message=generate_msg(
                        ret_msg,
                        path=save_filepath
                    )
                )

            # Download
            total_size = int(range_str.split("/")[-1]) if (range_str := res.headers.get("Content-Range")) else None
            async with aiofiles.open(str(temp_filepath), "ab", self._buffer_size) as f:
                chunk_iterator = res.aiter_bytes(self._chunk_size)
                t = tqdm_class(
                    desc=self._save_filename,
                    total=total_size,
                    initial=temp_size,
                    disable=not progress,
                    unit="B",
                    unit_scale=True
                )
                async for chunk in chunk_iterator:
                    if self._stop:
                        raise CancelledError
                    await f.write(chunk)
                    t.update(len(chunk))  # Update progress bar

        # Download finished
        if config.downloader.use_bucket:
            bucket_file_path.parent.mkdir(parents=True, exist_ok=True)
            os.link(temp_filepath, bucket_file_path)
        temp_filepath.rename(self._path / self._save_filename)

        # Callbacks
        if sync_callable:
            sync_callable(self)
        if async_callable:
            await async_callable(self)

        return DownloaderRet(
            data=self._save_filename
        ) if self._save_filename else DownloaderRet(
            code=RetCodeEnum.GeneralFailure,
            message=generate_msg(
                "Download failed",
                filename=self._designated_filename
            )
        )

DownloaderRet

Bases: BaseRet[_T]

Return data model of action call

Source code in ktoolbox/downloader/base.py
10
11
12
class DownloaderRet(BaseRet[_T]):
    """Return data model of action call"""
    pass

duplicate_file_check(local_file_path, bucket_file_path=None)

Check if the file existed, and link the bucket filepath to local filepath if DownloaderConfiguration.use_bucket enabled.

Parameters:

Name Type Description Default
local_file_path Path

Download target path

required
bucket_file_path Path

The bucket filepath of the local download path

None

Returns:

Type Description
Tuple[bool, Optional[str]]

(if file existed, message)

Source code in ktoolbox/downloader/utils.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def duplicate_file_check(local_file_path: Path, bucket_file_path: Path = None) -> Tuple[bool, Optional[str]]:
    """
    Check if the file existed, and link the bucket filepath to local filepath \
    if ``DownloaderConfiguration.use_bucket`` enabled.

    :param local_file_path: Download target path
    :param bucket_file_path: The bucket filepath of the local download path
    :return: ``(if file existed, message)``
    """
    duplicate_check_path = bucket_file_path or local_file_path
    if duplicate_check_path.is_file():
        if config.downloader.use_bucket:
            ret_msg = "Download file already exists in both bucket and local, skipping"
            if not local_file_path.is_file():
                ret_msg = "Download file already exists in bucket, linking to local path"
                os.link(bucket_file_path, local_file_path)
        else:
            ret_msg = "Download file already exists, skipping"
        return True, ret_msg
    else:
        return False, None

filename_from_headers(headers)

Get file name from headers.

Parse from Content-Disposition.

  • Example:

    filename_from_headers('attachment;filename*=utf-8\'\'README%2Emd;filename="README.md"')
    

  • Return:

    README.md
    

Parameters:

Name Type Description Default
headers Dict[str, str]

HTTP headers

required

Returns:

Type Description
Optional[str]

File name

Source code in ktoolbox/downloader/utils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def filename_from_headers(headers: Dict[str, str]) -> Optional[str]:
    """
    Get file name from headers.

    Parse from ``Content-Disposition``.

    - Example:
    ```
    filename_from_headers('attachment;filename*=utf-8\\'\\'README%2Emd;filename="README.md"')
    ```

    - Return:
    ```
    README.md
    ```

    :param headers: HTTP headers
    :return: File name
    """
    if not (disposition := headers.get("Content-Disposition")):
        if not (disposition := headers.get("content-disposition")):
            return None
    _, options = cgi.parse_header(disposition)  # alternative: `parse_header` in `utils.py`
    if filename := options.get("filename*"):
        if len(name_with_charset := filename.split("''")) == 2:
            charset, name = name_with_charset
            return urllib.parse.unquote(name, charset)
    if filename := options.get("filename"):
        return urllib.parse.unquote(filename, config.downloader.encoding)
    return None

base

__all__ = ['DownloaderRet'] module-attribute

DownloaderRet

Bases: BaseRet[_T]

Return data model of action call

Source code in ktoolbox/downloader/base.py
10
11
12
class DownloaderRet(BaseRet[_T]):
    """Return data model of action call"""
    pass

downloader

__all__ = ['Downloader'] module-attribute

Downloader

Attributes:

Name Type Description
_save_filename

The actual filename for saving.

Source code in ktoolbox/downloader/downloader.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class Downloader:
    """
    :ivar _save_filename: The actual filename for saving.
    """
    client = httpx.AsyncClient(verify=config.ssl_verify)

    def __init__(
            self,
            url: str,
            path: Path,
            *,
            buffer_size: int = None,
            chunk_size: int = None,
            designated_filename: str = None,
            server_path: str = None
    ):
        # noinspection GrazieInspection
        """
        Initialize a file downloader

        - About filename:
            1. If ``designated_filename`` parameter is set, use it.
            2. Else if ``Content-Disposition`` is set in headers, use filename from it.
            3. Else use filename from 'file' part of ``server_path``.

        :param url: Download URL
        :param path: Directory path to save the file, which needs to be sanitized
        :param buffer_size: Number of bytes for file I/O buffer
        :param chunk_size: Number of bytes for chunk of download stream
        :param designated_filename: Manually specify the filename for saving, which needs to be sanitized
        :param server_path: Server path of the file. if ``DownloaderConfiguration.use_bucket`` enabled, \
        it will be used as the save path.
        """

        self._url = url
        self._path = path
        self._buffer_size = buffer_size or config.downloader.buffer_size
        self._chunk_size = chunk_size or config.downloader.chunk_size
        self._designated_filename = designated_filename
        self._server_path = server_path  # /hash[:1]/hash2[1:3]/hash
        self._save_filename = designated_filename  # Prioritize the manually specified filename

        self._lock = asyncio.Lock()
        self._stop: bool = False

    @cached_property
    def url(self) -> str:
        """Download URL"""
        return self._url

    @cached_property
    def path(self) -> Path:
        """Directory path to save the file"""
        return self._path

    @cached_property
    def buffer_size(self) -> int:
        """Number of bytes for file I/O buffer"""
        return self._buffer_size

    @cached_property
    def chunk_size(self) -> int:
        """Number of bytes for chunk of download stream"""
        return self._chunk_size

    @property
    def filename(self) -> Optional[str]:
        """Actual filename of the download file"""
        return self._save_filename

    @property
    def finished(self) -> bool:
        """
        Check if the download finished

        :return: ``False`` if the download **in process**, ``True`` otherwise
        """
        return not self._lock.locked()

    def cancel(self):
        """
        Cancel the download

        It will raise ``asyncio.CancelledError`` in ``chunk_iterator`` (writing chunk to file) iteration.
        """
        self._stop = True

    @tenacity.retry(
        stop=stop_never if config.downloader.retry_stop_never else stop_after_attempt(config.downloader.retry_times),
        wait=wait_fixed(config.downloader.retry_interval),
        retry=retry_if_result(
            lambda x: not x and x.code != RetCodeEnum.FileExisted
        ) | retry_if_exception(
            lambda x: isinstance(x, httpx.HTTPError)
        ),
        before_sleep=lambda x: logger.warning(
            generate_msg(
                f"Retrying ({x.attempt_number})",
                message=x.outcome.result().message if not x.outcome.failed else None,
                exception=x.outcome.exception()
            )
        ),
        reraise=True
    )
    async def run(
            self,
            *,
            sync_callable: Callable[["Downloader"], Any] = None,
            async_callable: Callable[["Downloader"], Coroutine] = None,
            tqdm_class: Type[std_tqdm] = None,
            progress: bool = False
    ) -> DownloaderRet[str]:
        """
        Start to download

        :param sync_callable: Sync callable for download finished
        :param async_callable: Async callable for download finished
        :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
        :param progress: Show progress bar
        :return: ``DownloaderRet`` which contain the actual output filename
        :raise CancelledError: Job cancelled
        """
        # Get filename to check if file exists (First-time duplicate file check)
        # Check it before request to make progress more efficiency
        server_relpath = self._server_path[1:]
        server_relpath_without_params = urlparse(server_relpath).path
        server_path_filename = unquote(Path(server_relpath_without_params).name)
        # Priority order can be referenced from the constructor's documentation
        save_filepath = self._path / (self._save_filename or server_path_filename)

        # Get bucket file path
        bucket_file_path: Optional[Path] = None
        if config.downloader.use_bucket:
            bucket_file_path = config.downloader.bucket_path / server_relpath

        # Check if the file exists
        file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
        if file_existed:
            return DownloaderRet(
                code=RetCodeEnum.FileExisted,
                message=generate_msg(
                    ret_msg,
                    path=save_filepath
                )
            )

        tqdm_class: Type[std_tqdm] = tqdm_class or tqdm.asyncio.tqdm
        async with self._lock:
            temp_filepath = Path(f"{save_filepath}.{config.downloader.temp_suffix}")
            temp_size = temp_filepath.stat().st_size if temp_filepath.exists() else 0

            async with self.client.stream(
                    method="GET",
                    url=self._url,
                    follow_redirects=True,
                    timeout=config.downloader.timeout,
                    headers={"Range": f"bytes={temp_size}-"}
            ) as res:  # type: httpx.Response
                if res.status_code != httpx.codes.PARTIAL_CONTENT:
                    return DownloaderRet(
                        code=RetCodeEnum.GeneralFailure,
                        message=generate_msg(
                            "Download failed",
                            status_code=res.status_code,
                            filename=save_filepath
                        )
                    )

                # Get filename for saving and check if file exists (Second-time duplicate file check)
                # Priority order can be referenced from the constructor's documentation
                self._save_filename = self._designated_filename or sanitize_filename(
                    filename_from_headers(res.headers)
                ) or server_path_filename
                save_filepath = self._path / self._save_filename
                file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
                if file_existed:
                    return DownloaderRet(
                        code=RetCodeEnum.FileExisted,
                        message=generate_msg(
                            ret_msg,
                            path=save_filepath
                        )
                    )

                # Download
                total_size = int(range_str.split("/")[-1]) if (range_str := res.headers.get("Content-Range")) else None
                async with aiofiles.open(str(temp_filepath), "ab", self._buffer_size) as f:
                    chunk_iterator = res.aiter_bytes(self._chunk_size)
                    t = tqdm_class(
                        desc=self._save_filename,
                        total=total_size,
                        initial=temp_size,
                        disable=not progress,
                        unit="B",
                        unit_scale=True
                    )
                    async for chunk in chunk_iterator:
                        if self._stop:
                            raise CancelledError
                        await f.write(chunk)
                        t.update(len(chunk))  # Update progress bar

            # Download finished
            if config.downloader.use_bucket:
                bucket_file_path.parent.mkdir(parents=True, exist_ok=True)
                os.link(temp_filepath, bucket_file_path)
            temp_filepath.rename(self._path / self._save_filename)

            # Callbacks
            if sync_callable:
                sync_callable(self)
            if async_callable:
                await async_callable(self)

            return DownloaderRet(
                data=self._save_filename
            ) if self._save_filename else DownloaderRet(
                code=RetCodeEnum.GeneralFailure,
                message=generate_msg(
                    "Download failed",
                    filename=self._designated_filename
                )
            )

    __call__ = run
__call__ = run class-attribute instance-attribute
buffer_size: int cached property

Number of bytes for file I/O buffer

chunk_size: int cached property

Number of bytes for chunk of download stream

client = httpx.AsyncClient(verify=config.ssl_verify) class-attribute instance-attribute
filename: Optional[str] property

Actual filename of the download file

finished: bool property

Check if the download finished

Returns:

Type Description
bool

False if the download in process, True otherwise

path: Path cached property

Directory path to save the file

url: str cached property

Download URL

__init__(url, path, *, buffer_size=None, chunk_size=None, designated_filename=None, server_path=None)

Initialize a file downloader

  • About filename:
    1. If designated_filename parameter is set, use it.
    2. Else if Content-Disposition is set in headers, use filename from it.
    3. Else use filename from 'file' part of server_path.

Parameters:

Name Type Description Default
url str

Download URL

required
path Path

Directory path to save the file, which needs to be sanitized

required
buffer_size int

Number of bytes for file I/O buffer

None
chunk_size int

Number of bytes for chunk of download stream

None
designated_filename str

Manually specify the filename for saving, which needs to be sanitized

None
server_path str

Server path of the file. if DownloaderConfiguration.use_bucket enabled, it will be used as the save path.

None
Source code in ktoolbox/downloader/downloader.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
        self,
        url: str,
        path: Path,
        *,
        buffer_size: int = None,
        chunk_size: int = None,
        designated_filename: str = None,
        server_path: str = None
):
    # noinspection GrazieInspection
    """
    Initialize a file downloader

    - About filename:
        1. If ``designated_filename`` parameter is set, use it.
        2. Else if ``Content-Disposition`` is set in headers, use filename from it.
        3. Else use filename from 'file' part of ``server_path``.

    :param url: Download URL
    :param path: Directory path to save the file, which needs to be sanitized
    :param buffer_size: Number of bytes for file I/O buffer
    :param chunk_size: Number of bytes for chunk of download stream
    :param designated_filename: Manually specify the filename for saving, which needs to be sanitized
    :param server_path: Server path of the file. if ``DownloaderConfiguration.use_bucket`` enabled, \
    it will be used as the save path.
    """

    self._url = url
    self._path = path
    self._buffer_size = buffer_size or config.downloader.buffer_size
    self._chunk_size = chunk_size or config.downloader.chunk_size
    self._designated_filename = designated_filename
    self._server_path = server_path  # /hash[:1]/hash2[1:3]/hash
    self._save_filename = designated_filename  # Prioritize the manually specified filename

    self._lock = asyncio.Lock()
    self._stop: bool = False
cancel()

Cancel the download

It will raise asyncio.CancelledError in chunk_iterator (writing chunk to file) iteration.

Source code in ktoolbox/downloader/downloader.py
107
108
109
110
111
112
113
def cancel(self):
    """
    Cancel the download

    It will raise ``asyncio.CancelledError`` in ``chunk_iterator`` (writing chunk to file) iteration.
    """
    self._stop = True
run(*, sync_callable=None, async_callable=None, tqdm_class=None, progress=False) async

Start to download

Parameters:

Name Type Description Default
sync_callable Callable[[Downloader], Any]

Sync callable for download finished

None
async_callable Callable[[Downloader], Coroutine]

Async callable for download finished

None
tqdm_class Type[tqdm]

tqdm class to replace default tqdm.asyncio.tqdm

None
progress bool

Show progress bar

False

Returns:

Type Description
DownloaderRet[str]

DownloaderRet which contain the actual output filename

Raises:

Type Description
CancelledError

Job cancelled

Source code in ktoolbox/downloader/downloader.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@tenacity.retry(
    stop=stop_never if config.downloader.retry_stop_never else stop_after_attempt(config.downloader.retry_times),
    wait=wait_fixed(config.downloader.retry_interval),
    retry=retry_if_result(
        lambda x: not x and x.code != RetCodeEnum.FileExisted
    ) | retry_if_exception(
        lambda x: isinstance(x, httpx.HTTPError)
    ),
    before_sleep=lambda x: logger.warning(
        generate_msg(
            f"Retrying ({x.attempt_number})",
            message=x.outcome.result().message if not x.outcome.failed else None,
            exception=x.outcome.exception()
        )
    ),
    reraise=True
)
async def run(
        self,
        *,
        sync_callable: Callable[["Downloader"], Any] = None,
        async_callable: Callable[["Downloader"], Coroutine] = None,
        tqdm_class: Type[std_tqdm] = None,
        progress: bool = False
) -> DownloaderRet[str]:
    """
    Start to download

    :param sync_callable: Sync callable for download finished
    :param async_callable: Async callable for download finished
    :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
    :param progress: Show progress bar
    :return: ``DownloaderRet`` which contain the actual output filename
    :raise CancelledError: Job cancelled
    """
    # Get filename to check if file exists (First-time duplicate file check)
    # Check it before request to make progress more efficiency
    server_relpath = self._server_path[1:]
    server_relpath_without_params = urlparse(server_relpath).path
    server_path_filename = unquote(Path(server_relpath_without_params).name)
    # Priority order can be referenced from the constructor's documentation
    save_filepath = self._path / (self._save_filename or server_path_filename)

    # Get bucket file path
    bucket_file_path: Optional[Path] = None
    if config.downloader.use_bucket:
        bucket_file_path = config.downloader.bucket_path / server_relpath

    # Check if the file exists
    file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
    if file_existed:
        return DownloaderRet(
            code=RetCodeEnum.FileExisted,
            message=generate_msg(
                ret_msg,
                path=save_filepath
            )
        )

    tqdm_class: Type[std_tqdm] = tqdm_class or tqdm.asyncio.tqdm
    async with self._lock:
        temp_filepath = Path(f"{save_filepath}.{config.downloader.temp_suffix}")
        temp_size = temp_filepath.stat().st_size if temp_filepath.exists() else 0

        async with self.client.stream(
                method="GET",
                url=self._url,
                follow_redirects=True,
                timeout=config.downloader.timeout,
                headers={"Range": f"bytes={temp_size}-"}
        ) as res:  # type: httpx.Response
            if res.status_code != httpx.codes.PARTIAL_CONTENT:
                return DownloaderRet(
                    code=RetCodeEnum.GeneralFailure,
                    message=generate_msg(
                        "Download failed",
                        status_code=res.status_code,
                        filename=save_filepath
                    )
                )

            # Get filename for saving and check if file exists (Second-time duplicate file check)
            # Priority order can be referenced from the constructor's documentation
            self._save_filename = self._designated_filename or sanitize_filename(
                filename_from_headers(res.headers)
            ) or server_path_filename
            save_filepath = self._path / self._save_filename
            file_existed, ret_msg = duplicate_file_check(save_filepath, bucket_file_path)
            if file_existed:
                return DownloaderRet(
                    code=RetCodeEnum.FileExisted,
                    message=generate_msg(
                        ret_msg,
                        path=save_filepath
                    )
                )

            # Download
            total_size = int(range_str.split("/")[-1]) if (range_str := res.headers.get("Content-Range")) else None
            async with aiofiles.open(str(temp_filepath), "ab", self._buffer_size) as f:
                chunk_iterator = res.aiter_bytes(self._chunk_size)
                t = tqdm_class(
                    desc=self._save_filename,
                    total=total_size,
                    initial=temp_size,
                    disable=not progress,
                    unit="B",
                    unit_scale=True
                )
                async for chunk in chunk_iterator:
                    if self._stop:
                        raise CancelledError
                    await f.write(chunk)
                    t.update(len(chunk))  # Update progress bar

        # Download finished
        if config.downloader.use_bucket:
            bucket_file_path.parent.mkdir(parents=True, exist_ok=True)
            os.link(temp_filepath, bucket_file_path)
        temp_filepath.rename(self._path / self._save_filename)

        # Callbacks
        if sync_callable:
            sync_callable(self)
        if async_callable:
            await async_callable(self)

        return DownloaderRet(
            data=self._save_filename
        ) if self._save_filename else DownloaderRet(
            code=RetCodeEnum.GeneralFailure,
            message=generate_msg(
                "Download failed",
                filename=self._designated_filename
            )
        )

utils

__all__ = ['filename_from_headers', 'duplicate_file_check'] module-attribute

duplicate_file_check(local_file_path, bucket_file_path=None)

Check if the file existed, and link the bucket filepath to local filepath if DownloaderConfiguration.use_bucket enabled.

Parameters:

Name Type Description Default
local_file_path Path

Download target path

required
bucket_file_path Path

The bucket filepath of the local download path

None

Returns:

Type Description
Tuple[bool, Optional[str]]

(if file existed, message)

Source code in ktoolbox/downloader/utils.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def duplicate_file_check(local_file_path: Path, bucket_file_path: Path = None) -> Tuple[bool, Optional[str]]:
    """
    Check if the file existed, and link the bucket filepath to local filepath \
    if ``DownloaderConfiguration.use_bucket`` enabled.

    :param local_file_path: Download target path
    :param bucket_file_path: The bucket filepath of the local download path
    :return: ``(if file existed, message)``
    """
    duplicate_check_path = bucket_file_path or local_file_path
    if duplicate_check_path.is_file():
        if config.downloader.use_bucket:
            ret_msg = "Download file already exists in both bucket and local, skipping"
            if not local_file_path.is_file():
                ret_msg = "Download file already exists in bucket, linking to local path"
                os.link(bucket_file_path, local_file_path)
        else:
            ret_msg = "Download file already exists, skipping"
        return True, ret_msg
    else:
        return False, None

filename_from_headers(headers)

Get file name from headers.

Parse from Content-Disposition.

  • Example:

    filename_from_headers('attachment;filename*=utf-8\'\'README%2Emd;filename="README.md"')
    

  • Return:

    README.md
    

Parameters:

Name Type Description Default
headers Dict[str, str]

HTTP headers

required

Returns:

Type Description
Optional[str]

File name

Source code in ktoolbox/downloader/utils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def filename_from_headers(headers: Dict[str, str]) -> Optional[str]:
    """
    Get file name from headers.

    Parse from ``Content-Disposition``.

    - Example:
    ```
    filename_from_headers('attachment;filename*=utf-8\\'\\'README%2Emd;filename="README.md"')
    ```

    - Return:
    ```
    README.md
    ```

    :param headers: HTTP headers
    :return: File name
    """
    if not (disposition := headers.get("Content-Disposition")):
        if not (disposition := headers.get("content-disposition")):
            return None
    _, options = cgi.parse_header(disposition)  # alternative: `parse_header` in `utils.py`
    if filename := options.get("filename*"):
        if len(name_with_charset := filename.split("''")) == 2:
            charset, name = name_with_charset
            return urllib.parse.unquote(name, charset)
    if filename := options.get("filename"):
        return urllib.parse.unquote(filename, config.downloader.encoding)
    return None

parse_header(line)

Alternative resolution for parsing header line.

Apply when cgi.parse_header is unable to use due to the deprecation of cgi module.

https://peps.python.org/pep-0594/#cgi

  • Example:

    parse_header("text/html; charset=utf-8")
    

  • Return:

    {'text/html': None, 'charset': 'utf-8'}
    

Parameters:

Name Type Description Default
line str

Header line

required

Returns:

Type Description
Dict[str, Optional[str]]

Dict of header line

Source code in ktoolbox/downloader/utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def parse_header(line: str) -> Dict[str, Optional[str]]:
    """
    Alternative resolution for parsing header line.

    Apply when ``cgi.parse_header`` is unable to use due to the deprecation of `cgi` module.

    https://peps.python.org/pep-0594/#cgi

    - Example:
    ```
    parse_header("text/html; charset=utf-8")
    ```

    - Return:
    ```
    {'text/html': None, 'charset': 'utf-8'}
    ```

    :param line: Header line
    :return: Dict of header line
    """
    dict_value: Dict[str, Optional[str]] = {}
    for item in line.split(";"):
        if len(pair := item.split("=")) == 1:
            dict_value[pair[0]] = None
        else:
            dict_value.setdefault(*pair)
    return dict_value

editor

__all__ = ['EditWithSignalWidget', 'CascadingBoxes', 'run_config_editor'] module-attribute

default_config = Configuration(_env_file='') module-attribute

default_config_envs = set(dump_envs(default_config)) module-attribute

initial_envs = set(dump_envs(config)) module-attribute

menu_top = menu('KToolBox Configuration Editor', [sub_menu('Edit', [sub_menu('API', model_to_widgets(config.api)), sub_menu('Downloader', model_to_widgets(config.downloader)), sub_menu('Job', model_to_widgets(config.job)), sub_menu('Logger', model_to_widgets(config.logger)), urwid.Divider()] + list(model_to_widgets(config, ['ssl_verify', 'json_dump_indent', 'use_uvloop']))), urwid.Divider(), menu_option(urwid.Button('JSON Preview', lambda x: top.open_box(sub_menu_with_menu_widget('JSON Preview', [urwid.Text(config.model_dump_json(indent=4))])[1]))), menu_option(urwid.Button('JSON Preview (Python Mode)', lambda x: top.open_box(sub_menu_with_menu_widget('JSON Preview (Python Serialize Mode)', [urwid.Text(pprint.pformat(config.model_dump(mode='python'), sort_dicts=False))])[1]))), menu_option(urwid.Button('DotEnv Preview (.env / prod.env)', lambda x: top.open_box(sub_menu_with_menu_widget('DotEnv Preview (.env / prod.env)', [urwid.Text('\n'.join(dump_modified_envs(dump_envs(config))) or 'Same as the default configuration, DotEnv will be left empty.')])[1]))), urwid.Divider(), sub_menu('Save', [menu_option(urwid.Button("Save to '.env' / 'prod.env' file", on_save_dotenv))]), urwid.Divider(bottom=2), menu_option(urwid.Button('Help', lambda x: webbrowser.open('https://ktoolbox.readthedocs.io/latest/configuration/guide/'))), menu_option(urwid.Button('Exit', exit_program)), urwid.Divider(bottom=2), urwid.Text('For detailed information, please refer to https://ktoolbox.readthedocs.io', align=urwid.CENTER), urwid.Divider(), urwid.Text(__version__, align=urwid.CENTER)]) module-attribute

top = CascadingBoxes(menu_top) module-attribute

CascadingBoxes

Bases: WidgetPlaceholder

Source code in ktoolbox/editor.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class CascadingBoxes(urwid.WidgetPlaceholder):
    max_box_levels = 4

    def __init__(self, box: urwid.Widget) -> None:
        super().__init__(urwid.SolidFill("/"))
        self.box_level = 0
        self.open_box(box)

    def open_box(self, box: urwid.Widget):
        self.original_widget = urwid.Overlay(
            urwid.LineBox(
                urwid.Padding(box, align=urwid.CENTER, left=2, right=2)
            ),
            self.original_widget,
            align=urwid.CENTER,
            width=(urwid.RELATIVE, 80),
            valign=urwid.MIDDLE,
            height=(urwid.RELATIVE, 80),
            min_width=24,
            min_height=8,
            left=self.box_level * 3,
            right=(self.max_box_levels - self.box_level - 1) * 3,
            top=self.box_level * 2,
            bottom=(self.max_box_levels - self.box_level - 1) * 2,
        )
        self.box_level += 1

    def back(self) -> Optional[NoReturn]:
        self.original_widget = self.original_widget[0]
        self.box_level -= 1
        return None

    def exit(self):
        raise urwid.ExitMainLoop()

    def keypress(self, size, key: str) -> Union[str, NoReturn, None]:
        if key == "esc":
            if self.box_level > 1:
                self.back()
            else:
                exit_program()
        return super().keypress(size, key)

box_level = 0 instance-attribute

max_box_levels = 4 class-attribute instance-attribute

__init__(box)

Source code in ktoolbox/editor.py
50
51
52
53
def __init__(self, box: urwid.Widget) -> None:
    super().__init__(urwid.SolidFill("/"))
    self.box_level = 0
    self.open_box(box)

back()

Source code in ktoolbox/editor.py
74
75
76
77
def back(self) -> Optional[NoReturn]:
    self.original_widget = self.original_widget[0]
    self.box_level -= 1
    return None

exit()

Source code in ktoolbox/editor.py
79
80
def exit(self):
    raise urwid.ExitMainLoop()

keypress(size, key)

Source code in ktoolbox/editor.py
82
83
84
85
86
87
88
def keypress(self, size, key: str) -> Union[str, NoReturn, None]:
    if key == "esc":
        if self.box_level > 1:
            self.back()
        else:
            exit_program()
    return super().keypress(size, key)

open_box(box)

Source code in ktoolbox/editor.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def open_box(self, box: urwid.Widget):
    self.original_widget = urwid.Overlay(
        urwid.LineBox(
            urwid.Padding(box, align=urwid.CENTER, left=2, right=2)
        ),
        self.original_widget,
        align=urwid.CENTER,
        width=(urwid.RELATIVE, 80),
        valign=urwid.MIDDLE,
        height=(urwid.RELATIVE, 80),
        min_width=24,
        min_height=8,
        left=self.box_level * 3,
        right=(self.max_box_levels - self.box_level - 1) * 3,
        top=self.box_level * 2,
        bottom=(self.max_box_levels - self.box_level - 1) * 2,
    )
    self.box_level += 1

EditWithSignalWidget

Bases: Edit

Custom urwid.Edit, support callback when changed.

Source code in ktoolbox/editor.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class EditWithSignalWidget(urwid.Edit):
    """
    Custom ``urwid.Edit``, support callback when changed.
    """

    def __init__(
            self,
            *args,
            on_state_change: Optional[Callable[[EditWithSignalWidget, _T], Any]],
            user_data: Optional[_T],
            **kwargs
    ) -> None:
        self.__on_state_change = on_state_change
        self.__user_data = user_data
        super().__init__(*args, **kwargs)

    def keypress(self, size: Tuple[int], key: str) -> Union[str, None]:
        ret = super().keypress(size, key)
        self.__on_state_change(self, self.__user_data)
        return ret

__on_state_change = on_state_change instance-attribute

__user_data = user_data instance-attribute

__init__(*args, on_state_change, user_data, **kwargs)

Source code in ktoolbox/editor.py
30
31
32
33
34
35
36
37
38
39
def __init__(
        self,
        *args,
        on_state_change: Optional[Callable[[EditWithSignalWidget, _T], Any]],
        user_data: Optional[_T],
        **kwargs
) -> None:
    self.__on_state_change = on_state_change
    self.__user_data = user_data
    super().__init__(*args, **kwargs)

keypress(size, key)

Source code in ktoolbox/editor.py
41
42
43
44
def keypress(self, size: Tuple[int], key: str) -> Union[str, None]:
    ret = super().keypress(size, key)
    self.__on_state_change(self, self.__user_data)
    return ret

dump_envs(model)

Dump environment variables, with no Env prefix

Source code in ktoolbox/editor.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def dump_envs(model: BaseModel) -> List[str]:
    """Dump environment variables, with no Env prefix"""
    envs = []
    for field in model.model_fields:
        value = model.__getattribute__(field)
        if isinstance(value, BaseModel):
            for env in dump_envs(value):
                envs.append(f"{field.upper()}__{env}")
        else:
            envs.append(
                f"{field.upper()}="
                f"{json.dumps(list(value)) if isinstance(value, (list, set, tuple, dict)) else model.__pydantic_serializer__.to_python(value)}"
            )
    return envs

dump_modified_envs(envs)

Dump modified environment variables, with Env prefix

Parameters:

Name Type Description Default
envs List[str]

Current Envs

required
Source code in ktoolbox/editor.py
107
108
109
110
111
112
113
114
115
def dump_modified_envs(envs: List[str]) -> List[str]:
    """
    Dump modified environment variables, with Env prefix

    :param envs: Current Envs
    """
    return sorted([
        f"KTOOLBOX_{env}" for env in set(envs) - default_config_envs
    ])

exit_program(_=None)

Source code in ktoolbox/editor.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def exit_program(_: urwid.Button = None) -> Optional[NoReturn]:
    if has_changed():
        top.open_box(
            urwid.Filler(
                urwid.Pile([
                    urwid.Text("Any unsaved changes will be lost. Are you sure you want to EXIT?"),
                    urwid.Divider(),
                    menu_option(urwid.Button(
                        "NO", lambda x: top.back()
                    )),
                    menu_option(urwid.Button(
                        "YES", lambda x: top.exit()
                    )),
                ])
            )
        )
    else:
        top.exit()

get_item(model, field, get_value_callback, widget_list, list_walker)

Source code in ktoolbox/editor.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_item(
        model: BaseModel,
        field: str,
        get_value_callback: Callable[[EditWithSignalWidget], Optional[Any]],
        widget_list: List[urwid.WidgetPlaceholder],
        list_walker: urwid.ListWalker
) -> Callable[[str], urwid.WidgetPlaceholder]:
    def inner(edit_text: str = ""):
        item = urwid.WidgetPlaceholder(urwid.Widget())
        edit_widget = EditWithSignalWidget(
            edit_text=edit_text,
            align=urwid.LEFT,
            on_state_change=on_item_changed,
            user_data=(model, field, get_value_callback, widget_list, item)
        )
        columns_widget = urwid.Columns([
            edit_widget,
            urwid.Divider(),
            urwid.Divider(),
            urwid.Button(
                "Remove -",
                on_remove_item,
                (model, field, widget_list, item, list_walker)
            )
        ])
        item.original_widget = columns_widget
        return item

    return inner

get_value(item_types)

Source code in ktoolbox/editor.py
326
327
328
329
330
331
332
333
334
335
def get_value(item_types: Sequence[type]) -> Callable[[EditWithSignalWidget], Optional[Any]]:
    def inner(w: EditWithSignalWidget = None):
        for t in item_types:
            try:
                return t(w.get_edit_text()) if w is not None else t()
            except ValueError:
                continue
        return None

    return inner

has_changed()

Source code in ktoolbox/editor.py
133
134
def has_changed() -> bool:
    return bool(set(dump_envs(config)) - initial_envs)

menu(title, choices)

Source code in ktoolbox/editor.py
170
171
172
173
174
175
def menu(
        title: Union[str, Tuple[Hashable, str], List[Union[str, Tuple[Hashable, str]]]],
        choices: Iterable[urwid.Widget],
) -> urwid.ListBox:
    body = [urwid.Text(title, align=urwid.CENTER), urwid.Divider(), *choices]
    return urwid.ListBox(urwid.SimpleFocusListWalker(body))

menu_option(widget)

Return focus_map="reversed" Widget

Source code in ktoolbox/editor.py
137
138
139
def menu_option(widget: urwid.Widget) -> urwid.AttrMap:
    """Return ``focus_map="reversed"`` Widget"""
    return urwid.AttrMap(widget, None, focus_map="reversed")

model_to_widgets(model, fields=None)

Generate urwid widgets for Pydantic model

Parameters:

Name Type Description Default
model BaseModel

Pydantic model

required
fields Iterable[str]

Only generate for these fields, default to all fields.

None
Source code in ktoolbox/editor.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def model_to_widgets(model: BaseModel, fields: Iterable[str] = None) -> Generator[urwid.Widget, Any, None]:
    """
    Generate urwid widgets for Pydantic model

    :param model: Pydantic model
    :param fields: Only generate for these fields, default to all fields.
    """
    for field, field_info in model.model_fields.items():
        if fields is not None and field not in fields:
            continue
        origin_annotation = getattr(field_info.annotation, '__origin__', None)
        annotation = get_args(field_info.annotation) if origin_annotation is Union else [field_info.annotation]

        if origin_annotation is Literal:
            radio_buttons = []
            for value in get_args(field_info.annotation):
                menu_option(urwid.RadioButton(
                    radio_buttons,
                    str(value),
                    model.__getattribute__(field) == value,
                    on_radio_button_change,
                    (model, field, value)
                ))
            yield sub_menu(field, radio_buttons)
        elif bool in annotation:
            yield menu_option(urwid.CheckBox(
                field,
                model.__getattribute__(field),
                on_state_change=on_checkbox_change,
                user_data=(model, field)
            ))
        elif any(map(lambda x: x in annotation, [str, int, float, Path])):
            yield menu_option(urwid.Columns([
                urwid.Text(f"{' ' * 4}{field}", align=urwid.LEFT),
                EditWithSignalWidget(
                    edit_text=str(model.__getattribute__(field)),
                    align=urwid.RIGHT,
                    on_state_change=on_edit_change,
                    user_data=(model, field, annotation)
                )
            ]))
        elif origin_annotation in [list, set, tuple]:
            item_types = get_args(field_info.annotation)
            widget_list = []
            widget, menu_widget = sub_menu_with_menu_widget(field, [])
            list_walker: urwid.SimpleFocusListWalker = menu_widget.body  # type: ignore
            widget_list.extend([
                get_item(model, field, get_value(item_types), widget_list, list_walker)
                (str(existed)) for existed in model.__getattribute__(field)
            ])
            # noinspection PyTypeChecker
            option_widget = menu_option(
                urwid.Button(
                    "Add +",
                    on_add_item,
                    (
                        model,
                        field,
                        get_value(item_types),
                        widget_list,
                        get_item(model, field, get_value(item_types), widget_list, list_walker),
                        list_walker
                    )
                )
            )
            list_walker.extend([urwid.Divider(), option_widget, urwid.Divider()])
            list_walker.extend(widget_list)
            yield widget
        elif isinstance(field_info.annotation, ModelMetaclass):
            yield sub_menu(field, model_to_widgets(model.__getattribute__(field)))
        else:
            yield sub_menu(
                field,
                [urwid.Text(
                    f"This option ({repr(field_info.annotation)}) is currently not supported for editing in "
                    "the graphical interface; please edit it in the '.env' or 'prod.env' file in the working directory."
                )]
            )
    yield urwid.Divider()
    yield menu_option(urwid.Button(
        f"View Document: {type(model).__name__}", lambda x: webbrowser.open(
            f"https://ktoolbox.readthedocs.io/latest/configuration/reference/#ktoolbox.configuration.{type(model).__name__}"
        )
    ))

on_add_item(_, user_data)

Call when add item to List/Set/Tuple field

Parameters:

Name Type Description Default
_ Button

Widget

required
user_data Tuple[BaseModel, str, Callable[[], Optional[Any]], Union[List[_T], List[None]], Callable[[], _T], Union[MonitoredFocusList[_T], ListWalker]]

(model, field, () -> (default value), item list, () -> (new item), menu widget)

required
Source code in ktoolbox/editor.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def on_add_item(
        _: urwid.Button,
        user_data: Tuple[
            BaseModel,
            str,
            Callable[[], Optional[Any]],
            Union[List[_T], List[None]],
            Callable[[], _T],
            Union[urwid.MonitoredFocusList[_T], urwid.ListWalker]
        ]
):
    """
    Call when add item to List/Set/Tuple field

    :param _: Widget
    :param user_data: (model, field, () -> (default value), item list, () -> (new item), menu widget)
    """
    model, field, get_default, item_list, get_new_widget, widget = user_data
    values = list(model.__getattribute__(field))
    values.append(get_default())
    model.__setattr__(field, values)
    new_widget = get_new_widget()
    item_list.append(new_widget)
    widget.append(new_widget)

on_checkbox_change(_, state, user_data)

Source code in ktoolbox/editor.py
184
185
186
def on_checkbox_change(_: urwid.CheckBox, state: bool, user_data: Tuple[BaseModel, str]):
    model, field = user_data
    model.__setattr__(field, state)

on_edit_change(widget, user_data)

Source code in ktoolbox/editor.py
263
264
265
266
267
268
269
270
271
def on_edit_change(widget: urwid.EditWithSignalWidget, user_data: Tuple[BaseModel, str, Iterable[type]]):
    model, field, annotation = user_data
    for field_type in annotation:
        try:
            model.__setattr__(field, field_type(widget.get_edit_text()))
        except ValueError:
            continue
        else:
            break

on_item_changed(widget, user_data)

Call when List/Set/Tuple field item changed

Parameters:

Name Type Description Default
widget EditWithSignalWidget

Widget

required
user_data Tuple[BaseModel, str, Callable[[EditWithSignalWidget], Any], Union[List[_T], List[None]], _T]

(model, field, (edit widget) -> (value), item list, item)

required
Source code in ktoolbox/editor.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def on_item_changed(
        widget: EditWithSignalWidget,
        user_data: Tuple[
            BaseModel,
            str,
            Callable[[EditWithSignalWidget], Any],
            Union[List[_T], List[None]],
            _T
        ]
):
    """
    Call when List/Set/Tuple field item changed

    :param widget: Widget
    :param user_data: (model, field, (edit widget) -> (value), item list, item)
    """
    model, field, get_value_callback, item_list, item = user_data
    values = list(model.__getattribute__(field))
    index = item_list.index(item)
    values[index] = get_value_callback(widget)
    model.__setattr__(field, values)

on_radio_button_change(_, state, user_data)

Source code in ktoolbox/editor.py
178
179
180
181
def on_radio_button_change(_: urwid.RadioButton, state: bool, user_data: Tuple[BaseModel, str, Any]):
    if state:
        model, field, value = user_data
        model.__setattr__(field, value)

on_remove_item(_, user_data)

Call when remove item to List/Set/Tuple field

Parameters:

Name Type Description Default
_ Button

Widget

required
user_data Tuple[BaseModel, str, Union[List[_T], List[None]], _T, Union[MonitoredFocusList[_T], ListWalker]]

(model, field, item list, item, menu widget)

required
Source code in ktoolbox/editor.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def on_remove_item(
        _: urwid.Button,
        user_data: Tuple[
            BaseModel,
            str,
            Union[List[_T], List[None]],
            _T,
            Union[urwid.MonitoredFocusList[_T], urwid.ListWalker]
        ]
):
    """
    Call when remove item to List/Set/Tuple field

    :param _: Widget
    :param user_data: (model, field, item list, item, menu widget)
    """
    model, field, item_list, item, widget = user_data
    values = list(model.__getattribute__(field))
    index = item_list.index(item)
    values.pop(index)
    model.__setattr__(field, values)
    item_list.pop(index)
    widget.remove(item)

on_save_dotenv(_)

Source code in ktoolbox/editor.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def on_save_dotenv(_: urwid.Button):
    if has_changed():
        pile = urwid.Pile([
            urwid.Text("Your changes have been saved."),
            urwid.Divider(),
            menu_option(urwid.Button(
                "OK", lambda x: top.back()
            )),
        ])
        try:
            save_dotenv()
        except Exception as e:
            pile = urwid.Pile([
                urwid.Text("Unable to save changes!"),
                urwid.Divider(),
                urwid.Text(f"{type(e).__name__}: {e}"),
                urwid.Divider(),
                menu_option(urwid.Button(
                    "OK", lambda x: top.back()
                )),
            ])
    else:
        pile = urwid.Pile([
            urwid.Text("Nothing has changed, no need to save."),
            urwid.Divider(),
            menu_option(urwid.Button(
                "OK", lambda x: top.back()
            )),
        ])
    top.open_box(urwid.Filler(pile))

run_config_editor()

Source code in ktoolbox/editor.py
455
456
def run_config_editor():
    urwid.MainLoop(top, palette=[("reversed", "standout", "")]).run()

save_dotenv()

Source code in ktoolbox/editor.py
118
119
120
121
122
123
124
125
126
127
128
129
130
def save_dotenv():
    current_envs = dump_envs(config)
    envs_to_dump = "\n".join(dump_modified_envs(current_envs))
    prod_dotenv_path = Path("prod.env")
    dotenv_path = Path(".env")
    if prod_dotenv_path.is_file():
        with prod_dotenv_path.open("w") as f:
            f.write(envs_to_dump)
    else:
        with dotenv_path.open("w") as f:
            f.write(envs_to_dump)
    initial_envs.clear()
    initial_envs.update(current_envs)

sub_menu(caption, choices)

Source code in ktoolbox/editor.py
162
163
164
165
166
167
def sub_menu(
        caption: Union[str, Tuple[Hashable, str], List[Union[str, Tuple[Hashable, str]]]],
        choices: Iterable[urwid.Widget],
) -> urwid.AttrMap[urwid.Button]:
    button, _ = sub_menu_with_menu_widget(caption, choices)
    return button

sub_menu_with_menu_widget(caption, choices)

Source code in ktoolbox/editor.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def sub_menu_with_menu_widget(
        caption: Union[str, Tuple[Hashable, str], List[Union[str, Tuple[Hashable, str]]]],
        choices: Iterable[urwid.Widget],
) -> Tuple[urwid.AttrMap[urwid.Button], urwid.ListBox]:
    contents = menu(
        caption,
        list(choices) + [
            urwid.Divider(bottom=2),
            menu_option(urwid.Button(
                "Back", lambda x: top.back()
            ))
        ]
    )

    return menu_option(urwid.Button(
        [caption, "..."],
        lambda x: top.open_box(contents)
    )), contents

job

CreatorIndices

Bases: BaseKToolBoxData

Creator directory indices model

Record the path of each downloaded post.

Source code in ktoolbox/job/model.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class CreatorIndices(BaseKToolBoxData):
    """
    Creator directory indices model

    Record the path of each downloaded post.
    """
    creator_id: str
    """Creator ID"""
    service: str
    """Creator service"""
    posts: Dict[str, Post] = {}
    """All posts, ``id`` -> ``Post``"""
    posts_path: Dict[str, Path] = {}
    """Posts and their path, ``id`` -> ``Path``"""

creator_id: str instance-attribute

Creator ID

posts: Dict[str, Post] = {} class-attribute instance-attribute

All posts, id -> Post

posts_path: Dict[str, Path] = {} class-attribute instance-attribute

Posts and their path, id -> Path

service: str instance-attribute

Creator service

Job

Bases: BaseModel

Download job model

Source code in ktoolbox/job/model.py
13
14
15
16
17
18
19
20
21
22
23
24
class Job(BaseModel):
    """
    Download job model
    """
    path: Path
    """Directory path to save the file"""
    alt_filename: Optional[str] = None
    """Use this name if no filename given by the server"""
    server_path: str
    """The `path` part of download URL"""
    type: Optional[Literal[PostFileTypeEnum.Attachment, PostFileTypeEnum.File]] = None
    """Target file type"""

alt_filename: Optional[str] = None class-attribute instance-attribute

Use this name if no filename given by the server

path: Path instance-attribute

Directory path to save the file

server_path: str instance-attribute

The path part of download URL

type: Optional[Literal[PostFileTypeEnum.Attachment, PostFileTypeEnum.File]] = None class-attribute instance-attribute

Target file type

JobListData

Bases: BaseKToolBoxData

Download job list data model

For saving the list of jobs to disk.

Source code in ktoolbox/job/model.py
41
42
43
44
45
46
47
48
class JobListData(BaseKToolBoxData):
    """
    Download job list data model

    For saving the list of jobs to disk.
    """
    jobs: List[Job] = []
    """All jobs"""

jobs: List[Job] = [] class-attribute instance-attribute

All jobs

JobRunner

Source code in ktoolbox/job/runner.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class JobRunner:
    def __init__(self, *, job_list: List[Job] = None, tqdm_class: std_tqdm = None, progress: bool = True):
        """
        Create a job runner

        :param job_list: Jobs to initial ``self._job_queue``
        :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
        :param progress: Show progress bar
        """
        job_list = job_list or []
        self._job_queue: asyncio.Queue[Job] = asyncio.Queue()
        for job in job_list:
            self._job_queue.put_nowait(job)
        self._tqdm_class = tqdm_class
        self._progress = progress
        self._downloaders_with_task: Dict[Downloader, asyncio.Task] = {}
        self._concurrent_tasks: Set[asyncio.Task] = set()
        self._lock = asyncio.Lock()

    @property
    def finished(self):
        """
        Check if all jobs finished

        :return: ``False`` if **in process**, ``False`` otherwise
        """
        return not self._lock.locked()

    @cached_property
    def downloaders(self):
        """Get downloaders with task"""
        return MappingProxyType(self._downloaders_with_task)

    @property
    def waiting_size(self) -> int:
        """Get the number of jobs waiting to be processed"""
        return self._job_queue.qsize()

    @property
    def done_size(self) -> int:
        """Get the number of jobs that done"""
        size = 0
        for downloader, task in self._downloaders_with_task.items():
            if downloader.finished or task.done():
                size += 1
        return size

    @property
    def processing_size(self) -> int:
        """Get the number of jobs that in process"""
        return len(self._downloaders_with_task) - self.done_size

    async def processor(self) -> int:
        """
        Process each job in ``self._job_queue``

        :return: Number of jobs that failed
        """
        failed_num = 0
        while not self._job_queue.empty():
            job = await self._job_queue.get()

            # Create downloader
            url_parts = [config.downloader.scheme, config.api.files_netloc, job.server_path, '', '', '']
            url = str(urlunparse(url_parts))
            downloader = Downloader(
                url=url,
                path=job.path,
                designated_filename=job.alt_filename,
                server_path=job.server_path
            )

            # Create task
            task = asyncio.create_task(
                downloader.run(
                    tqdm_class=self._tqdm_class,
                    progress=self._progress
                )
            )
            self._downloaders_with_task[downloader] = task
            # task.add_done_callback(lambda _: self._downloaders_with_task.pop(downloader))
            #   Delete this for counting finished job tasks

            # Run task
            task_done_set, _ = await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION)
            task_done = task_done_set.pop()
            try:
                exception = task_done.exception()
            except CancelledError as e:
                exception = e
            if not exception:  # raise Exception when cancelled or other exceptions
                ret = task_done.result()
                if ret.code == RetCodeEnum.Success:
                    logger.success(
                        generate_msg(
                            "Download success",
                            filename=ret.data
                        )
                    )
                elif ret.code == RetCodeEnum.FileExisted:
                    logger.warning(ret.message)
                else:
                    logger.error(ret.message)
                    failed_num += 1
            elif isinstance(exception, CancelledError):
                logger.warning(
                    generate_msg(
                        "Download cancelled",
                        filename=job.alt_filename
                    )
                )
            else:
                logger.error(
                    generate_msg(
                        "Download failed",
                        filename=job.alt_filename,
                        exception=exception
                    )
                )
                failed_num += 1
            self._job_queue.task_done()
        await self._job_queue.join()
        return failed_num

    async def _watch_status(self):
        """
        Watch running, completed, failed jobs
        """
        while not self._job_queue.empty():
            await asyncio.sleep(30)
            logger.info(f"Waiting: {self.waiting_size} / "
                        f"Running: {self.processing_size} / "
                        f"Completed: {self.done_size} "
                        f"({(self.done_size / (self.waiting_size + self.processing_size + self.done_size)) * 100:.2f}%)")

    async def start(self):
        """
        Start processing jobs concurrently

        It will **Block** until other call of ``self.start()`` method finished
        """
        failed_num = 0
        async with self._lock:
            self._concurrent_tasks.clear()
            for _ in range(config.job.count):
                task = asyncio.create_task(self.processor())
                self._concurrent_tasks.add(task)
                task.add_done_callback(self._concurrent_tasks.discard)
            _, (task_done_set, _) = await asyncio.gather(
                self._watch_status(),
                asyncio.wait(self._concurrent_tasks)
            )
            for task in task_done_set:
                try:
                    failed_num += task.result()
                except CancelledError:
                    pass
        if failed_num:
            logger.warning(generate_msg(f"{failed_num} jobs failed, download finished"))
        else:
            logger.success(generate_msg("All jobs in queue finished"))
        return failed_num

    async def add_jobs(self, *jobs: Job):
        """Add jobs to ``self._job_queue``"""
        for job in jobs:
            await self._job_queue.put(job)

    @staticmethod
    async def _force_cancel(target: asyncio.Task, wait_time: float = None) -> bool:
        """
        Force cancel ``asyncio.Task`` after ``wait_time`` seconds

        :param target: Target task
        :param wait_time: Seconds to wait before cancel (``0`` for skip one event loop run cycle)
        :return: Whether cancelled successfully
        """
        if wait_time is not None:
            await asyncio.sleep(wait_time)
        return target.cancel()

    async def cancel_downloader(self, target: Downloader) -> bool:
        """
        Cancel downloader

        :return: Whether cancelled successfully
        """
        task = self._downloaders_with_task[target]
        if not task.done():
            target.cancel()
            return await self._force_cancel(task, 0) or task.done()
        return True

done_size: int property

Get the number of jobs that done

downloaders cached property

Get downloaders with task

finished property

Check if all jobs finished

Returns:

Type Description

False if in process, False otherwise

processing_size: int property

Get the number of jobs that in process

waiting_size: int property

Get the number of jobs waiting to be processed

__init__(*, job_list=None, tqdm_class=None, progress=True)

Create a job runner

Parameters:

Name Type Description Default
job_list List[Job]

Jobs to initial self._job_queue

None
tqdm_class tqdm

tqdm class to replace default tqdm.asyncio.tqdm

None
progress bool

Show progress bar

True
Source code in ktoolbox/job/runner.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, *, job_list: List[Job] = None, tqdm_class: std_tqdm = None, progress: bool = True):
    """
    Create a job runner

    :param job_list: Jobs to initial ``self._job_queue``
    :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
    :param progress: Show progress bar
    """
    job_list = job_list or []
    self._job_queue: asyncio.Queue[Job] = asyncio.Queue()
    for job in job_list:
        self._job_queue.put_nowait(job)
    self._tqdm_class = tqdm_class
    self._progress = progress
    self._downloaders_with_task: Dict[Downloader, asyncio.Task] = {}
    self._concurrent_tasks: Set[asyncio.Task] = set()
    self._lock = asyncio.Lock()

add_jobs(*jobs) async

Add jobs to self._job_queue

Source code in ktoolbox/job/runner.py
183
184
185
186
async def add_jobs(self, *jobs: Job):
    """Add jobs to ``self._job_queue``"""
    for job in jobs:
        await self._job_queue.put(job)

cancel_downloader(target) async

Cancel downloader

Returns:

Type Description
bool

Whether cancelled successfully

Source code in ktoolbox/job/runner.py
201
202
203
204
205
206
207
208
209
210
211
async def cancel_downloader(self, target: Downloader) -> bool:
    """
    Cancel downloader

    :return: Whether cancelled successfully
    """
    task = self._downloaders_with_task[target]
    if not task.done():
        target.cancel()
        return await self._force_cancel(task, 0) or task.done()
    return True

processor() async

Process each job in self._job_queue

Returns:

Type Description
int

Number of jobs that failed

Source code in ktoolbox/job/runner.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def processor(self) -> int:
    """
    Process each job in ``self._job_queue``

    :return: Number of jobs that failed
    """
    failed_num = 0
    while not self._job_queue.empty():
        job = await self._job_queue.get()

        # Create downloader
        url_parts = [config.downloader.scheme, config.api.files_netloc, job.server_path, '', '', '']
        url = str(urlunparse(url_parts))
        downloader = Downloader(
            url=url,
            path=job.path,
            designated_filename=job.alt_filename,
            server_path=job.server_path
        )

        # Create task
        task = asyncio.create_task(
            downloader.run(
                tqdm_class=self._tqdm_class,
                progress=self._progress
            )
        )
        self._downloaders_with_task[downloader] = task
        # task.add_done_callback(lambda _: self._downloaders_with_task.pop(downloader))
        #   Delete this for counting finished job tasks

        # Run task
        task_done_set, _ = await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION)
        task_done = task_done_set.pop()
        try:
            exception = task_done.exception()
        except CancelledError as e:
            exception = e
        if not exception:  # raise Exception when cancelled or other exceptions
            ret = task_done.result()
            if ret.code == RetCodeEnum.Success:
                logger.success(
                    generate_msg(
                        "Download success",
                        filename=ret.data
                    )
                )
            elif ret.code == RetCodeEnum.FileExisted:
                logger.warning(ret.message)
            else:
                logger.error(ret.message)
                failed_num += 1
        elif isinstance(exception, CancelledError):
            logger.warning(
                generate_msg(
                    "Download cancelled",
                    filename=job.alt_filename
                )
            )
        else:
            logger.error(
                generate_msg(
                    "Download failed",
                    filename=job.alt_filename,
                    exception=exception
                )
            )
            failed_num += 1
        self._job_queue.task_done()
    await self._job_queue.join()
    return failed_num

start() async

Start processing jobs concurrently

It will Block until other call of self.start() method finished

Source code in ktoolbox/job/runner.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def start(self):
    """
    Start processing jobs concurrently

    It will **Block** until other call of ``self.start()`` method finished
    """
    failed_num = 0
    async with self._lock:
        self._concurrent_tasks.clear()
        for _ in range(config.job.count):
            task = asyncio.create_task(self.processor())
            self._concurrent_tasks.add(task)
            task.add_done_callback(self._concurrent_tasks.discard)
        _, (task_done_set, _) = await asyncio.gather(
            self._watch_status(),
            asyncio.wait(self._concurrent_tasks)
        )
        for task in task_done_set:
            try:
                failed_num += task.result()
            except CancelledError:
                pass
    if failed_num:
        logger.warning(generate_msg(f"{failed_num} jobs failed, download finished"))
    else:
        logger.success(generate_msg("All jobs in queue finished"))
    return failed_num

model

__all__ = ['Job', 'JobListData', 'CreatorIndices'] module-attribute

CreatorIndices

Bases: BaseKToolBoxData

Creator directory indices model

Record the path of each downloaded post.

Source code in ktoolbox/job/model.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class CreatorIndices(BaseKToolBoxData):
    """
    Creator directory indices model

    Record the path of each downloaded post.
    """
    creator_id: str
    """Creator ID"""
    service: str
    """Creator service"""
    posts: Dict[str, Post] = {}
    """All posts, ``id`` -> ``Post``"""
    posts_path: Dict[str, Path] = {}
    """Posts and their path, ``id`` -> ``Path``"""
creator_id: str instance-attribute

Creator ID

posts: Dict[str, Post] = {} class-attribute instance-attribute

All posts, id -> Post

posts_path: Dict[str, Path] = {} class-attribute instance-attribute

Posts and their path, id -> Path

service: str instance-attribute

Creator service

Job

Bases: BaseModel

Download job model

Source code in ktoolbox/job/model.py
13
14
15
16
17
18
19
20
21
22
23
24
class Job(BaseModel):
    """
    Download job model
    """
    path: Path
    """Directory path to save the file"""
    alt_filename: Optional[str] = None
    """Use this name if no filename given by the server"""
    server_path: str
    """The `path` part of download URL"""
    type: Optional[Literal[PostFileTypeEnum.Attachment, PostFileTypeEnum.File]] = None
    """Target file type"""
alt_filename: Optional[str] = None class-attribute instance-attribute

Use this name if no filename given by the server

path: Path instance-attribute

Directory path to save the file

server_path: str instance-attribute

The path part of download URL

type: Optional[Literal[PostFileTypeEnum.Attachment, PostFileTypeEnum.File]] = None class-attribute instance-attribute

Target file type

JobListData

Bases: BaseKToolBoxData

Download job list data model

For saving the list of jobs to disk.

Source code in ktoolbox/job/model.py
41
42
43
44
45
46
47
48
class JobListData(BaseKToolBoxData):
    """
    Download job list data model

    For saving the list of jobs to disk.
    """
    jobs: List[Job] = []
    """All jobs"""
jobs: List[Job] = [] class-attribute instance-attribute

All jobs

runner

__all__ = ['JobRunner'] module-attribute

JobRunner

Source code in ktoolbox/job/runner.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class JobRunner:
    def __init__(self, *, job_list: List[Job] = None, tqdm_class: std_tqdm = None, progress: bool = True):
        """
        Create a job runner

        :param job_list: Jobs to initial ``self._job_queue``
        :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
        :param progress: Show progress bar
        """
        job_list = job_list or []
        self._job_queue: asyncio.Queue[Job] = asyncio.Queue()
        for job in job_list:
            self._job_queue.put_nowait(job)
        self._tqdm_class = tqdm_class
        self._progress = progress
        self._downloaders_with_task: Dict[Downloader, asyncio.Task] = {}
        self._concurrent_tasks: Set[asyncio.Task] = set()
        self._lock = asyncio.Lock()

    @property
    def finished(self):
        """
        Check if all jobs finished

        :return: ``False`` if **in process**, ``False`` otherwise
        """
        return not self._lock.locked()

    @cached_property
    def downloaders(self):
        """Get downloaders with task"""
        return MappingProxyType(self._downloaders_with_task)

    @property
    def waiting_size(self) -> int:
        """Get the number of jobs waiting to be processed"""
        return self._job_queue.qsize()

    @property
    def done_size(self) -> int:
        """Get the number of jobs that done"""
        size = 0
        for downloader, task in self._downloaders_with_task.items():
            if downloader.finished or task.done():
                size += 1
        return size

    @property
    def processing_size(self) -> int:
        """Get the number of jobs that in process"""
        return len(self._downloaders_with_task) - self.done_size

    async def processor(self) -> int:
        """
        Process each job in ``self._job_queue``

        :return: Number of jobs that failed
        """
        failed_num = 0
        while not self._job_queue.empty():
            job = await self._job_queue.get()

            # Create downloader
            url_parts = [config.downloader.scheme, config.api.files_netloc, job.server_path, '', '', '']
            url = str(urlunparse(url_parts))
            downloader = Downloader(
                url=url,
                path=job.path,
                designated_filename=job.alt_filename,
                server_path=job.server_path
            )

            # Create task
            task = asyncio.create_task(
                downloader.run(
                    tqdm_class=self._tqdm_class,
                    progress=self._progress
                )
            )
            self._downloaders_with_task[downloader] = task
            # task.add_done_callback(lambda _: self._downloaders_with_task.pop(downloader))
            #   Delete this for counting finished job tasks

            # Run task
            task_done_set, _ = await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION)
            task_done = task_done_set.pop()
            try:
                exception = task_done.exception()
            except CancelledError as e:
                exception = e
            if not exception:  # raise Exception when cancelled or other exceptions
                ret = task_done.result()
                if ret.code == RetCodeEnum.Success:
                    logger.success(
                        generate_msg(
                            "Download success",
                            filename=ret.data
                        )
                    )
                elif ret.code == RetCodeEnum.FileExisted:
                    logger.warning(ret.message)
                else:
                    logger.error(ret.message)
                    failed_num += 1
            elif isinstance(exception, CancelledError):
                logger.warning(
                    generate_msg(
                        "Download cancelled",
                        filename=job.alt_filename
                    )
                )
            else:
                logger.error(
                    generate_msg(
                        "Download failed",
                        filename=job.alt_filename,
                        exception=exception
                    )
                )
                failed_num += 1
            self._job_queue.task_done()
        await self._job_queue.join()
        return failed_num

    async def _watch_status(self):
        """
        Watch running, completed, failed jobs
        """
        while not self._job_queue.empty():
            await asyncio.sleep(30)
            logger.info(f"Waiting: {self.waiting_size} / "
                        f"Running: {self.processing_size} / "
                        f"Completed: {self.done_size} "
                        f"({(self.done_size / (self.waiting_size + self.processing_size + self.done_size)) * 100:.2f}%)")

    async def start(self):
        """
        Start processing jobs concurrently

        It will **Block** until other call of ``self.start()`` method finished
        """
        failed_num = 0
        async with self._lock:
            self._concurrent_tasks.clear()
            for _ in range(config.job.count):
                task = asyncio.create_task(self.processor())
                self._concurrent_tasks.add(task)
                task.add_done_callback(self._concurrent_tasks.discard)
            _, (task_done_set, _) = await asyncio.gather(
                self._watch_status(),
                asyncio.wait(self._concurrent_tasks)
            )
            for task in task_done_set:
                try:
                    failed_num += task.result()
                except CancelledError:
                    pass
        if failed_num:
            logger.warning(generate_msg(f"{failed_num} jobs failed, download finished"))
        else:
            logger.success(generate_msg("All jobs in queue finished"))
        return failed_num

    async def add_jobs(self, *jobs: Job):
        """Add jobs to ``self._job_queue``"""
        for job in jobs:
            await self._job_queue.put(job)

    @staticmethod
    async def _force_cancel(target: asyncio.Task, wait_time: float = None) -> bool:
        """
        Force cancel ``asyncio.Task`` after ``wait_time`` seconds

        :param target: Target task
        :param wait_time: Seconds to wait before cancel (``0`` for skip one event loop run cycle)
        :return: Whether cancelled successfully
        """
        if wait_time is not None:
            await asyncio.sleep(wait_time)
        return target.cancel()

    async def cancel_downloader(self, target: Downloader) -> bool:
        """
        Cancel downloader

        :return: Whether cancelled successfully
        """
        task = self._downloaders_with_task[target]
        if not task.done():
            target.cancel()
            return await self._force_cancel(task, 0) or task.done()
        return True
done_size: int property

Get the number of jobs that done

downloaders cached property

Get downloaders with task

finished property

Check if all jobs finished

Returns:

Type Description

False if in process, False otherwise

processing_size: int property

Get the number of jobs that in process

waiting_size: int property

Get the number of jobs waiting to be processed

__init__(*, job_list=None, tqdm_class=None, progress=True)

Create a job runner

Parameters:

Name Type Description Default
job_list List[Job]

Jobs to initial self._job_queue

None
tqdm_class tqdm

tqdm class to replace default tqdm.asyncio.tqdm

None
progress bool

Show progress bar

True
Source code in ktoolbox/job/runner.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, *, job_list: List[Job] = None, tqdm_class: std_tqdm = None, progress: bool = True):
    """
    Create a job runner

    :param job_list: Jobs to initial ``self._job_queue``
    :param tqdm_class: ``tqdm`` class to replace default ``tqdm.asyncio.tqdm``
    :param progress: Show progress bar
    """
    job_list = job_list or []
    self._job_queue: asyncio.Queue[Job] = asyncio.Queue()
    for job in job_list:
        self._job_queue.put_nowait(job)
    self._tqdm_class = tqdm_class
    self._progress = progress
    self._downloaders_with_task: Dict[Downloader, asyncio.Task] = {}
    self._concurrent_tasks: Set[asyncio.Task] = set()
    self._lock = asyncio.Lock()
add_jobs(*jobs) async

Add jobs to self._job_queue

Source code in ktoolbox/job/runner.py
183
184
185
186
async def add_jobs(self, *jobs: Job):
    """Add jobs to ``self._job_queue``"""
    for job in jobs:
        await self._job_queue.put(job)
cancel_downloader(target) async

Cancel downloader

Returns:

Type Description
bool

Whether cancelled successfully

Source code in ktoolbox/job/runner.py
201
202
203
204
205
206
207
208
209
210
211
async def cancel_downloader(self, target: Downloader) -> bool:
    """
    Cancel downloader

    :return: Whether cancelled successfully
    """
    task = self._downloaders_with_task[target]
    if not task.done():
        target.cancel()
        return await self._force_cancel(task, 0) or task.done()
    return True
processor() async

Process each job in self._job_queue

Returns:

Type Description
int

Number of jobs that failed

Source code in ktoolbox/job/runner.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def processor(self) -> int:
    """
    Process each job in ``self._job_queue``

    :return: Number of jobs that failed
    """
    failed_num = 0
    while not self._job_queue.empty():
        job = await self._job_queue.get()

        # Create downloader
        url_parts = [config.downloader.scheme, config.api.files_netloc, job.server_path, '', '', '']
        url = str(urlunparse(url_parts))
        downloader = Downloader(
            url=url,
            path=job.path,
            designated_filename=job.alt_filename,
            server_path=job.server_path
        )

        # Create task
        task = asyncio.create_task(
            downloader.run(
                tqdm_class=self._tqdm_class,
                progress=self._progress
            )
        )
        self._downloaders_with_task[downloader] = task
        # task.add_done_callback(lambda _: self._downloaders_with_task.pop(downloader))
        #   Delete this for counting finished job tasks

        # Run task
        task_done_set, _ = await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION)
        task_done = task_done_set.pop()
        try:
            exception = task_done.exception()
        except CancelledError as e:
            exception = e
        if not exception:  # raise Exception when cancelled or other exceptions
            ret = task_done.result()
            if ret.code == RetCodeEnum.Success:
                logger.success(
                    generate_msg(
                        "Download success",
                        filename=ret.data
                    )
                )
            elif ret.code == RetCodeEnum.FileExisted:
                logger.warning(ret.message)
            else:
                logger.error(ret.message)
                failed_num += 1
        elif isinstance(exception, CancelledError):
            logger.warning(
                generate_msg(
                    "Download cancelled",
                    filename=job.alt_filename
                )
            )
        else:
            logger.error(
                generate_msg(
                    "Download failed",
                    filename=job.alt_filename,
                    exception=exception
                )
            )
            failed_num += 1
        self._job_queue.task_done()
    await self._job_queue.join()
    return failed_num
start() async

Start processing jobs concurrently

It will Block until other call of self.start() method finished

Source code in ktoolbox/job/runner.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def start(self):
    """
    Start processing jobs concurrently

    It will **Block** until other call of ``self.start()`` method finished
    """
    failed_num = 0
    async with self._lock:
        self._concurrent_tasks.clear()
        for _ in range(config.job.count):
            task = asyncio.create_task(self.processor())
            self._concurrent_tasks.add(task)
            task.add_done_callback(self._concurrent_tasks.discard)
        _, (task_done_set, _) = await asyncio.gather(
            self._watch_status(),
            asyncio.wait(self._concurrent_tasks)
        )
        for task in task_done_set:
            try:
                failed_num += task.result()
            except CancelledError:
                pass
    if failed_num:
        logger.warning(generate_msg(f"{failed_num} jobs failed, download finished"))
    else:
        logger.success(generate_msg("All jobs in queue finished"))
    return failed_num

model

__all__ = ['BaseKToolBoxData', 'SearchResult'] module-attribute

BaseKToolBoxData

Bases: BaseModel

Base class for all KToolBox data models.

Source code in ktoolbox/model.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BaseKToolBoxData(BaseModel):
    """
    Base class for all KToolBox data models.
    """

    def __init__(self, **data: Any):
        super().__init__(**data)
        self.type = type(self)

    version: str = __version__
    type: Union[Type["BaseKToolBoxData"], str] = None

    @field_serializer('type')
    def _(self, value: Type["BaseKToolBoxData"], _info):
        return str(value)

type: Union[Type[BaseKToolBoxData], str] = None class-attribute instance-attribute

version: str = __version__ class-attribute instance-attribute

_(value, _info)

Source code in ktoolbox/model.py
24
25
26
@field_serializer('type')
def _(self, value: Type["BaseKToolBoxData"], _info):
    return str(value)

__init__(**data)

Source code in ktoolbox/model.py
17
18
19
def __init__(self, **data: Any):
    super().__init__(**data)
    self.type = type(self)

SearchResult

Bases: BaseKToolBoxData, Generic[_T]

Cli search result

Source code in ktoolbox/model.py
29
30
31
class SearchResult(BaseKToolBoxData, Generic[_T]):
    """Cli search result"""
    result: List[_T] = []

result: List[_T] = [] class-attribute instance-attribute

utils

__all__ = ['BaseRet', 'generate_msg', 'logger_init', 'dump_search', 'parse_webpage_url', 'uvloop_init'] module-attribute

BaseRet

Bases: BaseModel, Generic[_T]

Base data model of function return value

Source code in ktoolbox/utils.py
28
29
30
31
32
33
34
35
36
37
38
class BaseRet(BaseModel, Generic[_T]):
    """Base data model of function return value"""
    code: int = RetCodeEnum.Success.value
    message: str = ''
    exception: Optional[Exception] = None
    data: Optional[_T] = None

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __bool__(self):
        return self.code == RetCodeEnum.Success

code: int = RetCodeEnum.Success.value class-attribute instance-attribute

data: Optional[_T] = None class-attribute instance-attribute

exception: Optional[Exception] = None class-attribute instance-attribute

message: str = '' class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True) class-attribute instance-attribute

__bool__()

Source code in ktoolbox/utils.py
37
38
def __bool__(self):
    return self.code == RetCodeEnum.Success
Source code in ktoolbox/utils.py
83
84
85
86
87
88
async def dump_search(result: List[BaseModel], path: Path):
    async with aiofiles.open(str(path), "w", encoding="utf-8") as f:
        await f.write(
            SearchResult(result=result)
            .model_dump_json(indent=config.json_dump_indent)
        )

generate_msg(title=None, **kwargs)

Generate message for BaseRet and logger

Parameters:

Name Type Description Default
title str

Message title

None
kwargs

Extra data

{}
Source code in ktoolbox/utils.py
41
42
43
44
45
46
47
48
49
def generate_msg(title: str = None, **kwargs):
    """
    Generate message for ``BaseRet`` and logger

    :param title: Message title
    :param kwargs: Extra data
    """
    title: str = title or ""
    return f"{title} - {kwargs}" if kwargs else title

logger_init(cli_use=False, disable_stdout=False)

Initialize loguru logger

Parameters:

Name Type Description Default
cli_use bool

Set logger level INFO and filter out SUCCESS

False
disable_stdout bool

Disable default output stream

False
Source code in ktoolbox/utils.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def logger_init(cli_use: bool = False, disable_stdout: bool = False):
    """
    Initialize ``loguru`` logger

    :param cli_use: Set logger level ``INFO`` and filter out ``SUCCESS``
    :param disable_stdout: Disable default output stream
    """
    if disable_stdout:
        logger.remove()
    elif cli_use:
        logger.remove()
        logger.add(
            tqdm.write,
            colorize=True,
            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
                   "<level>{level: <8}</level> | "
                   "<cyan>{name}</cyan> - <level>{message}</level>",
            level=logging.INFO,
            filter=lambda record: record["level"].name != "SUCCESS"
        )
    if path := config.logger.path:
        path.mkdir(exist_ok=True)
        if path is not None:
            logger.add(
                path / DataStorageNameEnum.LogData.value,
                level=config.logger.level,
                rotation=config.logger.rotation,
                diagnose=True
            )

parse_webpage_url(url)

Fetch service, user_id, post_id from webpage url

Each part can be None if not found in url.

Parameters:

Name Type Description Default
url str

Kemono Webpage url

required

Returns:

Type Description
Tuple[Optional[str], Optional[str], Optional[str]]

Tuple of service, user_id, post_id

Source code in ktoolbox/utils.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def parse_webpage_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
    # noinspection SpellCheckingInspection
    """
    Fetch **service**, **user_id**, **post_id** from webpage url

    Each part can be ``None`` if not found in url.

    :param url: Kemono Webpage url
    :return: Tuple of **service**, **user_id**, **post_id**
    """
    path_url = Path(url)
    parts = path_url.parts
    if (url_parts_len := len(parts)) < 7:
        # Pad to full size
        parts += tuple(None for _ in range(7 - url_parts_len))
    _scheme, _netloc, service, _user_key, user_id, _post_key, post_id = parts
    return service, user_id, post_id

uvloop_init()

Set event loop policy to uvloop if available.

Returns:

Type Description
bool

If uvloop enabled successfully

Source code in ktoolbox/utils.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def uvloop_init() -> bool:
    """
    Set event loop policy to uvloop if available.

    :return: If uvloop enabled successfully
    """
    if config.use_uvloop:
        if sys.platform == "win32":
            logger.debug("uvloop is not supported on Windows, but it's optional.")
        else:
            try:
                # noinspection PyUnresolvedReferences
                import uvloop
            except ModuleNotFoundError:
                logger.debug(
                    "uvloop is not installed, but it's optional. "
                    "You can install it with `pip install ktoolbox[uvloop]`"
                )
            else:
                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
                logger.success("Set event loop policy to uvloop successfully.")
                return True
    return False