一个用于查询 Hive 数据并通过 AIGC 应用处理的 Python 包。
pypabhiveagent 提供了一个简单易用的接口,用于从 Hive 表查询数据,将数据发送到大模型应用(Prompt 或 Agent 类型)进行处理,解析结果并将处理后的数据存储回 Hive 表或本地 Excel 文件。
- ✅ Hive 数据查询:使用标准 SQL 查询 Hive 表数据
- ✅ DataFrame 输入:支持直接传入 pandas DataFrame 作为数据源
- ✅ DataFrame 输出:查询结果包含 DataFrame 对象,便于后续处理
- ✅ 大小写不敏感:列名匹配支持大小写不敏感,提高易用性
- ✅ AIGC 应用集成:支持 Prompt 和 Agent 两种应用类型
- ✅ 智能结果解析:自动解析 JSON 或字符串格式的结果
- ✅ 思维链支持:可选择获取 AIGC 的推理过程
- ✅ 多种存储方式:支持保存到 Hive 表或 Excel 文件
- ✅ 并发处理:使用多线程提高大数据集处理效率
- ✅ 完善的错误处理:详细的错误日志和重试机制
- ✅ 灵活的配置:支持链式调用和丰富的配置选项
pip install pypabhiveagent如果需要使用 Hive 查询功能,需要额外安装 Hive 支持:
# 安装包含 PySpark 支持
pip install pypabhiveagent[hive]git clone https://github.com/Nevernamed/pypabhiveagent.git
cd pypabhiveagent
pip install -e . # 基础安装
# 或
pip install -e .[hive] # 包含 Hive 支持- 基础安装:只包含核心功能(pandas, requests, openpyxl),支持 DataFrame 输入输出
- Hive 支持:需要额外安装
pypabhiveagent[hive]来获得 PySpark 支持,用于 Hive 查询和存储 - 注意:如果只使用 DataFrame 作为数据源,不需要安装 Hive 支持
from pypabhiveagent import HiveAgent
# 1. 创建 HiveAgent 实例
agent = HiveAgent()
# 2. 配置连接参数
agent.set_config(
# AIGC 服务配置
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
# 3. 执行查询和处理
result = agent.query(
# SQL 查询语句
sql="SELECT id, question FROM question_table WHERE dt='20231201' LIMIT 10",
# 应用类型:'prompt' 或 'agent'
app_type='prompt',
# 字段映射:将 Hive 字段映射到 AIGC 参数
field_mapping={'question': None},
# 是否包含思维链结果
include_reasoning=False,
# 存储选项
cache_to_excel=True,
save_to_hive=True,
result_table_name='question_answer_result'
)
# 4. 查看结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")
print(f"处理行数: {result['rows_processed']}")
# 获取结果 DataFrame 用于后续处理
if result.get('df') is not None:
result_df = result['df']
print(f"结果 DataFrame: {len(result_df)} 行 x {len(result_df.columns)} 列")
if result.get('excel_path'):
print(f"Excel 文件: {result['excel_path']}")
if result.get('hive_table'):
print(f"Hive 表: {result['hive_table']}")from pypabhiveagent import HiveAgent
result = (
HiveAgent()
.set_config(
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
.query(
sql="SELECT id, text FROM my_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True,
save_to_hive=False
)
)
print(result['message'])
# 获取结果 DataFrame
result_df = result['df']使用 set_config() 方法配置以下必需参数:
| 参数 | 类型 | 说明 |
|---|---|---|
url |
str | AIGC 服务 URL |
app_id |
str | 应用 ID |
token |
str | 认证 token |
aigc_app_id |
str | AIGC 应用 ID |
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
timeout |
int | 60 | AIGC 请求超时时间(秒) |
max_retries |
int | 3 | 最大重试次数 |
sleep |
float | 2 | 请求间隔时间(秒),用于避免 API 限流 |
agent = HiveAgent()
agent.set_config(
# 必需参数
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345",
# 可选参数
timeout=60, # 增加超时时间到 60 秒
max_retries=5, # 增加重试次数到 5 次
sleep=0.5 # 每个请求间隔 0.5 秒
)pypabhiveagent 支持两种 AIGC 应用类型:
基于提示词的应用,适合模板化的查询场景。
单参数输入示例:
result = agent.query(
sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'question': None}, # 单参数
cache_to_excel=True,
save_to_hive=True,
result_table_name='qa_result'
)多参数输入示例:
result = agent.query(
sql="""
SELECT user_name, product_name, purchase_amount
FROM purchase_table
WHERE dt='20231201'
""",
app_type='prompt',
field_mapping={
'user_name': 'userName',
'product_name': 'productName',
'purchase_amount': 'amount'
}, # 多参数
cache_to_excel=True,
save_to_hive=True,
result_table_name='purchase_analysis'
)基于智能体的应用,适合复杂推理任务。
单参数输入示例:
result = agent.query(
sql="SELECT id, content FROM content_table WHERE dt='20231201'",
app_type='agent',
field_mapping={'content': None}, # 单参数
include_reasoning=True, # 包含思维链
cache_to_excel=True,
save_to_hive=True,
result_table_name='content_analysis'
)多参数输入示例:
result = agent.query(
sql="""
SELECT customer_name, order_status, order_amount
FROM order_table
WHERE dt='20231201'
""",
app_type='agent',
field_mapping={
'customer_name': 'customerName',
'order_status': 'status',
'order_amount': 'amount'
}, # 多参数
include_reasoning=True,
cache_to_excel=True,
save_to_hive=True,
result_table_name='order_analysis'
)字段映射用于指定 Hive 查询结果字段与 AIGC 应用参数的对应关系。
当只有一个输入参数时:
field_mapping = {'question': None}
# 或
field_mapping = {'question': 'query'}- Prompt 应用:值会放入请求体的
query字段 - Agent 应用:值会放入请求体的
message字段(格式:{'content': value, 'content_type': 'text'})
当有多个输入参数时:
field_mapping = {
'field1': 'param1',
'field2': 'param2',
'field3': 'param3'
}- Prompt 应用:键值对会放入请求体的
dynamicColumMap字段 - Agent 应用:键值对会放入请求体的
args字段
- 键:Hive 查询结果中的字段名
- 值:发送到 AIGC 应用的参数名(可以为
None,表示使用字段名)
pypabhiveagent 会自动解析 AIGC 返回的结果并合并到原始数据中。
如果 AIGC 返回 JSON 格式:
{"answer": "这是答案", "confidence": 0.95}会自动解析并添加为新列:
| id | question | answer | confidence |
|---|---|---|---|
| 1 | 问题1 | 这是答案 | 0.95 |
如果 AIGC 返回非 JSON 字符串,会创建 result 列:
| id | question | result |
|---|---|---|
| 1 | 问题1 | 这是答案 |
当 include_reasoning=True 时,会添加 reasoning_content 列:
| id | question | answer | reasoning_content |
|---|---|---|---|
| 1 | 问题1 | 这是答案 | 推理过程... |
设置 cache_to_excel=True 将结果保存到本地 Excel 文件:
result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True # 保存到 Excel
)
print(f"Excel 文件: {result['excel_path']}")
# 输出: Excel 文件: prompt_app_12345-20231201143025.xlsx文件命名格式:{aigcAppId}-{年月日时分秒}.xlsx
设置 save_to_hive=True 将结果保存到 Hive 表:
result = agent.query(
sql="SELECT * FROM my_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
save_to_hive=True,
result_table_name='my_result_table', # 必需
write_mode='append' # 'append' 或 'overwrite'
)
print(f"Hive 表: {result['hive_table']}")
# 输出: Hive 表: my_result_table自动表管理:
- 如果结果表不存在,会自动创建
- 如果源表有
dt字段,会创建分区表(按dt分区) - 自动推断字段类型
写入模式:
append(默认):追加数据到表中overwrite:覆盖表中的数据
使用 max_workers 参数控制并发线程数:
result = agent.query(
sql="SELECT * FROM large_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
max_workers=10 # 使用 10 个并发线程
)性能建议:
- 默认值为 1,确保稳定性
- 如果 API 支持高并发,可以适当增加
max_workers(如 5-10) - 如果遇到 API 限流,降低
max_workers或增加sleep参数 - 建议每次处理 100-1000 行数据
除了使用 SQL 查询,pypabhiveagent 还支持直接传入 pandas DataFrame 作为数据源:
import pandas as pd
from pypabhiveagent import HiveAgent
# 创建或加载 DataFrame
data = {
'id': [1, 2, 3, 4, 5],
'Question': ['问题1', '问题2', '问题3', '问题4', '问题5'], # 注意大小写
'dt': ['20231201'] * 5
}
df = pd.DataFrame(data)
# 使用 DataFrame 作为数据源
agent = HiveAgent()
agent.set_config(
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
result = agent.query(
df=df, # 传入 DataFrame 而不是 SQL
app_type='prompt',
field_mapping={'question': None}, # 大小写不敏感,会匹配到 'Question'
cache_to_excel=True,
save_to_hive=True,
result_table_name='result_table'
)使用场景:
- 从 CSV、Excel 等文件加载数据
- 从 API 获取的数据
- 已经在内存中处理过的数据
- 需要预处理或筛选的数据
查询结果中包含处理后的 DataFrame,可以直接用于后续处理:
result = agent.query(
sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'question': None},
cache_to_excel=False,
save_to_hive=False
)
# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
result_df = result['df']
# 进行后续处理
print(f"处理了 {len(result_df)} 行数据")
print(f"列名: {list(result_df.columns)}")
# 数据分析
print(result_df.describe())
# 保存到其他格式
result_df.to_csv('output.csv', index=False)
result_df.to_json('output.json', orient='records')
# 继续处理
filtered_df = result_df[result_df['score'] > 0.8]使用场景:
- 需要对结果进行进一步分析
- 保存到多种格式(CSV、JSON、Parquet 等)
- 与其他数据处理流程集成
- 数据质量检查和验证
field_mapping 中的列名匹配是大小写不敏感的,提高了易用性:
# DataFrame 的列名可能是各种大小写
df = pd.DataFrame({
'ID': [1, 2, 3],
'UserName': ['张三', '李四', '王五'],
'ProductName': ['产品A', '产品B', '产品C'],
'DT': ['20231201'] * 3
})
# field_mapping 中可以使用任意大小写
result = agent.query(
df=df,
app_type='prompt',
field_mapping={
'username': 'userName', # 会匹配到 'UserName'
'productname': 'productName' # 会匹配到 'ProductName'
}
)优势:
- 不需要担心列名的大小写
- 提高代码的健壮性
- 减少因大小写不匹配导致的错误
pypabhiveagent 提供完善的错误处理机制。当部分行处理失败时:
处理策略:
- ✅ 保留原始数据
- ✅ 结果字段填充 None 值
- ✅ 保持与成功数据的结构一致
- ✅ 打印失败统计和提醒信息
result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None}
)
# 查看处理结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")
# 获取结果 DataFrame
if result['df'] is not None:
result_df = result['df']
# 识别失败的行(结果列为 None)
if 'answer' in result_df.columns:
failed_rows = result_df[result_df['answer'].isna()]
success_rows = result_df[result_df['answer'].notna()]
print(f"成功: {len(success_rows)} 行")
print(f"失败: {len(failed_rows)} 行")
# 查看失败详情
if result.get('errors'):
for error in result['errors'][:5]:
print(f"行 {error['row_index']}: {error['error']}")失败时的输出示例:
================================================================================
⚠️ PROCESSING COMPLETED WITH ERRORS
================================================================================
✓ Successfully processed: 8 rows
✗ Failed to process: 2 rows
📌 Failed rows are included in the result with:
- Original data preserved
- Result columns filled with None values
- Result columns: answer, confidence
💡 Tip: Filter failed rows using: df[df['result_column'].isna()]
================================================================================
数据结构示例:
# 成功的行
id | question | answer | confidence
1 | 问题1 | 答案1 | 0.95
# 失败的行(结果列为 None)
id | question | answer | confidence
2 | 问题2 | None | None错误类型:
ConfigurationError:配置错误HiveQueryError:Hive 查询错误AIGCRequestError:AIGC 请求错误ResultParseError:结果解析错误StorageError:存储错误
详细的错误处理指南请参考 docs/ERROR_HANDLING_GUIDE.md
pypabhiveagent 使用 Python 标准 logging 模块:
from pypabhiveagent import configure_logging, set_log_level
import logging
# 配置日志
configure_logging(
level=logging.INFO,
log_file='pypabhiveagent.log'
)
# 或者只设置日志级别
set_log_level(logging.DEBUG)查看 examples/ 目录获取更多示例:
basic_usage.py:基本使用示例prompt_app_example.py:Prompt 应用完整示例agent_app_example.py:Agent 应用完整示例dataframe_usage_example.py:DataFrame 输入输出示例error_handling_example.py:错误处理示例
或者使用 demo() 方法查看示例代码:
from pypabhiveagent import HiveAgent
# 直接打印完整的使用指南
HiveAgent.demo()pip install -r requirements-dev.txtpytestblack pypabhiveagent/mypy pypabhiveagent/flake8 pypabhiveagent/使用 sleep 参数设置请求间隔:
agent.set_config(
url="...",
app_id="...",
token="...",
aigc_app_id="...",
sleep=0.5 # 每个请求间隔 0.5 秒
)或者降低并发数:
result = agent.query(
sql="...",
app_type='prompt',
field_mapping={'text': None},
max_workers=3 # 降低并发数
)建议分批处理:
# 按日期分批处理
dates = ['20231201', '20231202', '20231203']
for date in dates:
result = agent.query(
sql=f"SELECT * FROM my_table WHERE dt='{date}'",
app_type='prompt',
field_mapping={'text': None},
result_table_name='my_result_table',
write_mode='append' # 追加模式
)
print(f"处理 {date}: {result['message']}")使用 timeout 参数:
agent.set_config(
url="...",
app_id="...",
token="...",
aigc_app_id="...",
timeout=60 # 超时时间 60 秒
)result = agent.query(
sql="...",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True,
save_to_hive=False # 不保存到 Hive
)import pandas as pd
# 准备 DataFrame
df = pd.DataFrame({
'id': [1, 2, 3],
'question': ['问题1', '问题2', '问题3'],
'dt': ['20231201'] * 3
})
# 使用 DataFrame 作为数据源
result = agent.query(
df=df, # 传入 DataFrame
app_type='prompt',
field_mapping={'question': None}
)result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None}
)
# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
result_df = result['df']
# 进行后续处理
result_df.to_csv('output.csv', index=False)
filtered_df = result_df[result_df['score'] > 0.8]不用担心!pypabhiveagent 支持大小写不敏感的列名匹配:
# DataFrame 列名是 'UserName',field_mapping 中可以使用 'username'
df = pd.DataFrame({'UserName': ['张三', '李四']})
result = agent.query(
df=df,
app_type='prompt',
field_mapping={'username': None} # 自动匹配到 'UserName'
)当前版本:1.0.0
主要特性:
- 支持 Hive SQL 查询和 DataFrame 输入
- 支持 Prompt 和 Agent 两种 AIGC 应用类型
- 大小写不敏感的列名匹配
- 完善的错误处理机制
- 灵活的存储选项(Excel 和 Hive)
MIT License - 详见 LICENSE 文件。
欢迎贡献!请随时提交 Pull Request。
如有问题或建议,请提交 Issue。