Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion astrbot/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import logging

__version__ = "4.26.0-beta.12"
__version__ = "4.26.0"
logger = logging.getLogger("astrbot")
2 changes: 1 addition & 1 deletion astrbot/core/agent/runners/tool_loop_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ async def _iter_llm_responses_with_fallback(
has_stream_output = False
with attempt:
try:
async for resp in self._iter_llm_responses(
async for resp in self._iter_llm_responses( # TODO 调用llm模型,获取流式响应
include_model=idx == 0
):
if resp.is_chunk:
Expand Down
4 changes: 2 additions & 2 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@
),
"llm_compress_keep_recent_ratio": 0.15,
"llm_compress_provider_id": "",
"max_context_length": 50,
"dequeue_context_length": 10,
"max_context_length": -1, # 默认不限制
"dequeue_context_length": 1,
"streaming_response": False,
"show_tool_use_status": False,
"show_tool_call_result": False,
Expand Down
4 changes: 2 additions & 2 deletions astrbot/core/core_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,12 @@ def _load(self) -> None:
for task in self.star_context._register_tasks:
extra_tasks.append(asyncio.create_task(task, name=task.__name__)) # type: ignore

tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])]
tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])] # [<Task pending name='event_bus' coro=<EventBus.dispatch() running at C:\Users\17875\Master\projects\github\AstrBot\astrbot\core\event_bus.py:39>>]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid leaving hardcoded local development paths in comments. Please remove the redundant comment.

Suggested change
tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])] # [<Task pending name='event_bus' coro=<EventBus.dispatch() running at C:\Users\17875\Master\projects\github\AstrBot\astrbot\core\event_bus.py:39>>]
tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])]

if cron_task:
tasks_.append(cron_task)
if temp_dir_cleaner_task:
tasks_.append(temp_dir_cleaner_task)
for task in tasks_:
for task in tasks_: # 为每个任务创建 task
self.curr_tasks.append(
asyncio.create_task(self._task_wrapper(task), name=task.get_name()),
)
Expand Down
60 changes: 48 additions & 12 deletions astrbot/core/db/migration/migra_45_to_46.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,80 @@
# 导入全局日志记录器和共享偏好设置实例
from astrbot.api import logger, sp
# 导入 AstrBot 配置管理器
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
# 导入 UMOP 配置路由器
from astrbot.core.umop_config_router import UmopConfigRouter


async def migrate_45_to_46(acm: AstrBotConfigManager, ucr: UmopConfigRouter) -> None:
"""
执行从版本 4.5 到 4.6 的数据迁移。

主要变更:在 4.5 版本中,UMOP(统一消息对象标识符)路由信息存储在
abconf_data 的每个配置项内部(作为 'umop' 字段);在 4.6 版本中,
UMOP 路由被提取到独立的 UmopConfigRouter 中进行管理。

迁移过程:
1. 检测是否需要迁移(检查配置中是否存在 'umop' 字段)
2. 提取所有 umop 到 conf_id 的映射关系
3. 从原配置中移除 'umop' 字段
4. 更新配置存储和 UMOP 路由器

Args:
acm: AstrBot 配置管理器实例,包含旧版本的配置数据
ucr: UMOP 配置路由器实例,用于存储迁移后的路由数据
"""
# 获取当前的配置数据(包含可能的旧版本 umop 字段)
abconf_data = acm.abconf_data

# 验证配置数据类型是否为字典
if not isinstance(abconf_data, dict):
# should be unreachable
# 理论上不应该到达这里,但如果数据格式异常则记录警告并退出
logger.warning(
f"migrate_45_to_46: abconf_data is not a dict (type={type(abconf_data)}). Value: {abconf_data!r}",
)
return
return # 数据类型异常,无法进行迁移

# 如果任何一项带有 umop,则说明需要迁移
need_migration = False
# 检查是否需要执行迁移:
# 遍历所有配置项,查找是否包含旧版本的 'umop' 字段
need_migration = False # 迁移标志,默认为不需要
for conf_id, conf_info in abconf_data.items():
# 检查配置项是否为字典且包含 'umop' 键
if isinstance(conf_info, dict) and "umop" in conf_info:
need_migration = True
break
need_migration = True # 发现需要迁移的数据
break # 找到一个就足够了,跳出循环

# 如果没有需要迁移的数据,直接返回
if not need_migration:
return

# 记录迁移开始日志
logger.info("Starting migration from version 4.5 to 4.6")

# extract umo->conf_id mapping
umo_to_conf_id = {}
# 第一步:提取 umoconf_id 的映射关系
umo_to_conf_id = {} # 初始化 UMOP 到配置 ID 的映射字典
for conf_id, conf_info in abconf_data.items():
# 只处理包含 'umop' 字段的字典类型配置项
if isinstance(conf_info, dict) and "umop" in conf_info:
# 从配置项中取出并删除 'umop' 字段(pop 方法会同时删除该字段)
umop_ls = conf_info.pop("umop")
# 验证 umop 字段是否为列表类型
if not isinstance(umop_ls, list):
continue
continue # 如果不是列表,跳过该配置项
# 遍历 umop 列表中的每个 UMO 字符串
for umo in umop_ls:
# 确保 umo 是字符串类型且尚未存在于映射中
if isinstance(umo, str) and umo not in umo_to_conf_id:
# 建立 UMO 到配置 ID 的映射关系
umo_to_conf_id[umo] = conf_id

# update the abconf data
# 第二步:更新配置数据到持久化存储
# 将移除了 umop 字段的配置数据保存到 SharedPreferences
await sp.global_put("abconf_mapping", abconf_data)
# update the umop config router

# 第三步:更新 UMOP 配置路由器
# 将提取的映射关系批量更新到路由器中
await ucr.update_routing_data(umo_to_conf_id)

logger.info("Migration from version 45 to 46 completed successfully")
# 记录迁移完成日志
logger.info("Migration from version 45 to 46 completed successfully")
124 changes: 89 additions & 35 deletions astrbot/core/db/migration/migra_webchat_session.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,185 @@
"""Migration script for WebChat sessions.
"""
WebChat 会话数据迁移脚本。

This migration creates PlatformSession from existing platform_message_history records.
此迁移从现有的 platform_message_history 记录中创建 PlatformSession 记录。

Changes:
- Creates platform_sessions table
- Adds platform_id field (default: 'webchat')
- Adds display_name field
- Session_id format: {platform_id}_{uuid}
变更内容:
- 创建 platform_sessions
- 添加 platform_id 字段(默认值:'webchat'
- 添加 display_name 字段
- Session_id 格式:{platform_id}_{uuid}
"""

# 导入 SQLAlchemy 的聚合函数和查询构建函数
from sqlalchemy import func, select
# 导入 SQLModel 的列选择函数,用于在查询中引用模型列
from sqlmodel import col

# 导入全局日志记录器和共享偏好设置实例
from astrbot.api import logger, sp
# 导入数据库基础操作类
from astrbot.core.db import BaseDatabase
# 导入数据库持久化对象(PO)模型
from astrbot.core.db.po import ConversationV2, PlatformMessageHistory, PlatformSession


async def migrate_webchat_session(db_helper: BaseDatabase) -> None:
"""Create PlatformSession records from platform_message_history.

This migration extracts all unique user_ids from platform_message_history
where platform_id='webchat' and creates corresponding PlatformSession records.
"""
# 检查是否已经完成迁移
从 platform_message_history 表创建 PlatformSession 记录。

此迁移提取 platform_message_history 中所有 platform_id='webchat' 的
唯一 user_id,并为每个用户创建对应的 PlatformSession 记录。
同时从 Conversations 表中获取对话标题作为会话的显示名称。

迁移过程:
1. 检查是否已完成迁移(通过偏好设置标记)
2. 查询所有 WebChat 用户的聊天历史记录
3. 检查已存在的会话,避免重复创建
4. 从 Conversations 表获取对话标题
5. 批量创建 PlatformSession 记录
6. 标记迁移完成

Args:
db_helper: 数据库操作助手实例,用于数据库访问和偏好设置管理
"""
# 检查迁移是否已经完成
# 从偏好设置中读取迁移完成标记
migration_done = await db_helper.get_preference(
"global", "global", "migration_done_webchat_session_1"
)
# 如果已经完成迁移,直接返回
if migration_done:
return

# 记录迁移开始日志
logger.info("开始执行数据库迁移(WebChat 会话迁移)...")

try:
# 获取数据库会话上下文管理器
async with db_helper.get_db() as session:
# 从 platform_message_history 创建 PlatformSession
# 构建查询:从 platform_message_history 中提取 WebChat 用户数据
query = (
select(
# 选择 user_id 字段(作为会话标识)
col(PlatformMessageHistory.user_id),
# 选择 sender_name 字段(发送者名称)
col(PlatformMessageHistory.sender_name),
# 使用聚合函数获取最早的创建时间
func.min(PlatformMessageHistory.created_at).label("earliest"),
# 使用聚合函数获取最晚的更新时间
func.max(PlatformMessageHistory.updated_at).label("latest"),
)
# 过滤条件:只查询 WebChat 平台的消息
.where(col(PlatformMessageHistory.platform_id) == "webchat")
# 过滤条件:排除机器人自己发送的消息
.where(col(PlatformMessageHistory.sender_id) != "bot")
# 按 user_id 分组,获取每个用户的聚合数据
.group_by(col(PlatformMessageHistory.user_id))
)

# 执行查询并获取结果
result = await session.execute(query)
# 获取所有查询结果行
webchat_users = result.all()

# 如果没有找到需要迁移的用户数据
if not webchat_users:
logger.info("没有找到需要迁移的 WebChat 数据")
# 直接标记迁移完成并返回
await sp.put_async(
"global", "global", "migration_done_webchat_session_1", True
)
return

# 记录找到的待迁移会话数量
logger.info(f"找到 {len(webchat_users)} 个 WebChat 会话需要迁移")

# 检查已存在的会话
# 查询已存在的 PlatformSession,避免重复创建
existing_query = select(col(PlatformSession.session_id))
existing_result = await session.execute(existing_query)
# 将已存在的 session_id 转换为集合,方便快速查找
existing_session_ids = {row[0] for row in existing_result.fetchall()}

# 查询 Conversations 表中的 title,用于设置 display_name
# 对于每个 user_id,对应的 conversation user_id 格式为: webchat:FriendMessage:webchat!astrbot!{user_id}
# 构建 Conversations 的 user_id 列表
# 格式: webchat:FriendMessage:webchat!astrbot!{user_id}
user_ids_to_query = [
f"webchat:FriendMessage:webchat!astrbot!{user_id}"
for user_id, _, _, _ in webchat_users
]
# 构建查询:获取 Conversations 的标题信息
conv_query = select(
col(ConversationV2.user_id), col(ConversationV2.title)
).where(col(ConversationV2.user_id).in_(user_ids_to_query))
col(ConversationV2.user_id), # 对话的用户 ID
col(ConversationV2.title) # 对话的标题
).where(
# 筛选出属于这些 WebChat 用户的对话
col(ConversationV2.user_id).in_(user_ids_to_query)
)
# 执行对话查询
conv_result = await session.execute(conv_query)
# 创建 user_id -> title 的映射字典
# 从 Conversations 的复合 user_id 中提取原始 user_id 作为键
title_map = {
user_id.replace("webchat:FriendMessage:webchat!astrbot!", ""): title
for user_id, title in conv_result.fetchall()
}

# 批量创建 PlatformSession 记录
# 准备批量创建的 PlatformSession 记录列表
sessions_to_add = []
# 记录跳过的会话数量(已存在的会话)
skipped_count = 0

# 遍历每个 WebChat 用户,创建对应的 PlatformSession
for user_id, sender_name, created_at, updated_at in webchat_users:
# user_id 就是 webchat_conv_id (session_id)
# user_id 直接作为 session_id 使用
session_id = user_id

# sender_name 通常是 username,但可能为 None
# 设置创建者名称,如果 sender_name 为空则使用 "guest"
creator = sender_name if sender_name else "guest"

# 检查是否已经存在该会话
# 检查该会话是否已经存在
if session_id in existing_session_ids:
logger.debug(f"会话 {session_id} 已存在,跳过")
skipped_count += 1
continue
skipped_count += 1 # 增加跳过计数
continue # 跳过已存在的会话

# 从 Conversations 表中获取 display_name
# 从 Conversations 表的映射中获取 display_name
display_name = title_map.get(user_id)

# 创建新的 PlatformSession(保留原有的时间戳)
# 创建新的 PlatformSession 对象,保留原始的时间戳信息
new_session = PlatformSession(
session_id=session_id,
platform_id="webchat",
creator=creator,
is_group=0,
created_at=created_at,
updated_at=updated_at,
display_name=display_name,
session_id=session_id, # 会话唯一标识
platform_id="webchat", # 平台标识
creator=creator, # 创建者名称
is_group=0, # 非群组会话(0 表示私聊)
created_at=created_at, # 原始创建时间
updated_at=updated_at, # 原始更新时间
display_name=display_name, # 显示名称(从对话标题获取)
)
# 添加到待插入列表
sessions_to_add.append(new_session)

# 批量插入
# 批量插入新创建的会话记录
if sessions_to_add:
# 使用 add_all 批量添加所有新会话
session.add_all(sessions_to_add)
# 提交事务,将所有更改写入数据库
await session.commit()

# 记录迁移完成统计信息
logger.info(
f"WebChat 会话迁移完成!成功迁移: {len(sessions_to_add)}, 跳过: {skipped_count}",
)
else:
# 没有新会话需要创建
logger.info("没有新会话需要迁移")

# 标记迁移完成
# 迁移成功完成,在偏好设置中标记完成状态
await sp.put_async("global", "global", "migration_done_webchat_session_1", True)

except Exception as e:
# 捕获迁移过程中的任何异常
# exc_info=True 会记录完整的异常堆栈信息
logger.error(f"迁移过程中发生错误: {e}", exc_info=True)
raise
# 重新抛出异常,让上层调用者处理
raise
28 changes: 14 additions & 14 deletions astrbot/core/db/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ def __init__(self, db_path: str) -> None:

async def initialize(self) -> None:
"""Initialize the database by creating tables if they do not exist."""
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
await conn.execute(text("PRAGMA journal_mode=WAL"))
await conn.execute(text("PRAGMA busy_timeout=30000"))
await conn.execute(text("PRAGMA synchronous=NORMAL"))
await conn.execute(text("PRAGMA cache_size=20000"))
await conn.execute(text("PRAGMA temp_store=MEMORY"))
await conn.execute(text("PRAGMA mmap_size=134217728"))
await conn.execute(text("PRAGMA optimize"))
async with self.engine.begin() as conn: # 开启数据库事务
await conn.run_sync(SQLModel.metadata.create_all) # 同步执行 SQLModel.metadata.create_all, 根据所有 SQLModel 模型定义,自动创建不存在的数据库表, create_all 是同步方法,需要用 run_sync 在异步环境中执行
await conn.execute(text("PRAGMA journal_mode=WAL")) # 设置日志模式为 WAL (Write-Ahead Logging), 允许并发读写,写入不阻塞读取,提升并发性能
await conn.execute(text("PRAGMA busy_timeout=30000")) # 设置忙等待超时为 30 秒(30000 毫秒),当数据库被锁定时,等待最多 30 秒而不是立即报错
await conn.execute(text("PRAGMA synchronous=NORMAL")) # 设置同步模式为 NORMAL,在安全性和性能间取得平衡,比 FULL 模式快,比 OFF 模式安全
await conn.execute(text("PRAGMA cache_size=20000")) # 设置缓存大小为 20000 页(约 80MB),增加内存缓存,减少磁盘 I/O,提升查询速度
await conn.execute(text("PRAGMA temp_store=MEMORY")) # 将临时表和索引存储在内存中,避免创建临时文件,提升临时操作的速度,
await conn.execute(text("PRAGMA mmap_size=134217728")) # 设置内存映射大小为 128MB (134217728 字节),将数据库文件映射到内存,减少 read/write 系统调用
await conn.execute(text("PRAGMA optimize")) # 执行数据库优, 分析表并更新查询优化器统计信息,提升查询性能化
# 确保 personas 表有 folder_id、sort_order、skills 列(前向兼容)
await self._ensure_persona_folder_columns(conn)
await self._ensure_persona_skills_column(conn)
await self._ensure_persona_custom_error_message_column(conn)
await self._ensure_platform_message_history_checkpoint_column(conn)
await conn.commit()
await self._ensure_persona_folder_columns(conn) # 确保 personas 表有 folder_id 和 sort_order 列,向前兼容,为旧版本数据库添加新字段
await self._ensure_persona_skills_column(conn) # 确保 personas 表有 skills 列,向前兼容,为旧数据库补充技能字段
await self._ensure_persona_custom_error_message_column(conn) # 确保 personas 表有 custom_error_message 列,向前兼容,为旧数据库添加自定义错误消息字段
await self._ensure_platform_message_history_checkpoint_column(conn) # 确保 platform_message_history 表有 llm_checkpoint_id 列,向前兼容,为旧数据库添加 llm_checkpoint_id 字段
await conn.commit() # 提交所有更改,确保表创建和 PRAGMA 设置持久化到数据库

async def _ensure_persona_folder_columns(self, conn) -> None:
"""确保 personas 表有 folder_id 和 sort_order 列。
Expand Down
Loading