|
楼主 |
发表于 2025-6-26 17:11
|
显示全部楼层
import asyncio
import logging
from typing import List, Optional
import httpx
from pydantic import BaseModel, Field, ValidationError
# 基本配置
CHECK_INTERVAL_SECONDS = 60
STOCK_URLS = [
"https://api.store.nvidia.com/partner/v1/feinventory?status=1&skus=LCFEGF50LD90&locale=fr-fr",
"https://api.store.nvidia.com/partner/v1/feinventory?status=1&skus=PROFESHOP5090&locale=de-de",
]
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# 响应模型
class ListItem(BaseModel):
is_active: bool
product_url: Optional[str] = None
price: str
fe_sku: str
locale: str
def to_message(self) -> str:
""" "转换为通知消息"""
return (
f"SKU: {self.fe_sku}\n"
f"价格: {self.price}\n"
f"状态: {'有库存' if self.is_active else '无库存'}\n"
f"库存链接: {self.product_url or '无链接'}\n"
f"库存地区: {self.locale}"
)
async def send_notification(self):
"""发送通知"""
try:
response = httpx.post(
"https://api.pushover.net:443/1/messages.json",
data={
"token": "XXXXXXXXXXXXX",
"user": "YYYYYYYYYYYYY",
"message": f"发现新库存:\n{self.to_message()}",
},
)
response.raise_for_status()
except Exception as e:
logging.error(f"发送通知时发生错误: {e}")
class APIResponse(BaseModel):
success: bool
list_map: List[ListItem] = Field(alias="listMap")
async def check_stock(url: str):
"""检查库存"""
async with httpx.AsyncClient(timeout=10.0) as client:
while True:
logging.info(f"检查库存: {url}")
try:
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8,en;q=0.7",
"Origin": "https://marketplace.nvidia.com",
"Referer": "https://marketplace.nvidia.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
}
response = await client.get(url, headers=headers)
response.raise_for_status()
data = APIResponse.model_validate(response.json())
if not data.list_map:
logging.warning(f"响应数据错误:{data.model_dump_json()}")
else:
for product in data.list_map:
if product.is_active:
logging.info("发现新库存😊")
await product.send_notification()
if not any(item.is_active for item in data.list_map):
logging.info("没有库存😭")
except Exception as e:
logging.error(f"发生未知错误: {e}")
logging.info(f"等待 {CHECK_INTERVAL_SECONDS} 秒后进行下一次检查...")
await asyncio.sleep(CHECK_INTERVAL_SECONDS)
async def main():
"""主函数"""
tasks = [check_stock(url) for url in STOCK_URLS]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("手动中断🤚")
前俩天看了另一个老哥的帖子顺手写了个。用pushover发的通知,pushover需要自己搞定。 |
|