Files
llm-intelligence/scripts/real_intraday_llm_provider.py
phamnazage-jpg 88833fac8b
Some checks failed
CI / go-test (push) Has been cancelled
CI / scripts-regression (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / docker-build (push) Has been cancelled
feat(intraday): monitor DeepSeek official page drift
2026-05-27 22:01:20 +08:00

60 lines
2.1 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import urllib.request
api_key = os.environ.get('OPENROUTER_API_KEY', '').strip()
if not api_key:
print('OPENROUTER_API_KEY missing', file=sys.stderr)
raise SystemExit(1)
payload = sys.stdin.read()
request_data = json.loads(payload or '{}')
search_results = request_data.get('search_results', [])
date = request_data.get('date', '')
prompt = {
"role": "user",
"content": (
"你是大模型情报候选发现器。根据给定搜索结果,只输出 JSON 数组,不要输出 markdown。"
"每项字段必须包含 event_type, provider_name, model_name, provider_country, title, summary, candidate_urls。"
"event_type 只能是 price_cut, price_increase, official_release, promo_campaign, leak_or_rumor, unknown。"
"只有当搜索结果明确像是当天消息时才输出。没有 URL 的候选不要输出。"
f"\n日期: {date}\n搜索结果:\n" + json.dumps(search_results, ensure_ascii=False)
)
}
req_body = json.dumps({
"model": "deepseek/deepseek-v4-flash",
"messages": [prompt],
"temperature": 0,
"max_tokens": 1200,
"response_format": {"type": "json_object"}
}).encode('utf-8')
req = urllib.request.Request(
'https://openrouter.ai/api/v1/chat/completions',
data=req_body,
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'HTTP-Referer': 'https://github.com/phamnazage-jpg/llm-intelligence',
'X-Title': 'llm-intelligence intraday discovery'
},
method='POST'
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read().decode('utf-8'))
content = data['choices'][0]['message']['content']
parsed = json.loads(content)
if isinstance(parsed, dict):
if 'items' in parsed and isinstance(parsed['items'], list):
parsed = parsed['items']
elif 'candidates' in parsed and isinstance(parsed['candidates'], list):
parsed = parsed['candidates']
else:
parsed = []
if not isinstance(parsed, list):
parsed = []
print(json.dumps(parsed, ensure_ascii=False))