sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")
D:\ainews\backend\venv\Scripts\python.exe -m backend.app
* Serving Flask app 'app'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5000
* Running on http://192.168.43.53:5000
Press CTRL+C to quit
Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\lenovo\AppData\Local\Temp\jieba.cache
Loading model cost 1.007 seconds.
Prefix dict has been built successfully.
[2026-01-09 23:57:57,046] ERROR in app: Exception on /api/news/crawl_and_analyze [POST]
Traceback (most recent call last):
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
cursor.execute(statement, parameters)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 153, in execute
result = self._query(query)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 322, in _query
conn.query(q)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 558, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 822, in _read_query_result
result.read()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 1200, in read
first_packet = self.connection._read_packet()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 772, in _read_packet
packet.raise_for_error()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\protocol.py", line 221, in raise_for_error
err.raise_mysql_exception(self._data)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\err.py", line 143, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.IntegrityError: (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 2190, in wsgi_app
response = self.full_dispatch_request()
File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1486, in full_dispatch_request
rv = self.handle_user_exception(e)
File "D:\ainews\backend\venv\lib\site-packages\flask_cors\extension.py", line 165, in wrapped_function
return cors_after_request(app.make_response(f(*args, **kwargs)))
File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1484, in full_dispatch_request
rv = self.dispatch_request()
File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1469, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "D:\ainews\backend\app.py", line 176, in crawl_and_analyze
s.commit()
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
trans.commit(_to_root=True)
File "<string>", line 2, in commit
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
self._prepare_impl()
File "<string>", line 2, in _prepare_impl
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
self.session.flush()
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
self._flush(objects)
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
with util.safe_reraise():
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
raise exc_value.with_traceback(exc_tb)
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
flush_context.execute()
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
rec.execute(self)
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
util.preloaded.orm_persistence.save_obj(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
_emit_insert_statements(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1227, in _emit_insert_statements
result = connection.execute(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
return meth(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 517, in _execute_on_connection
return connection._execute_clauseelement(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
ret = self._execute_context(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
return self._exec_single_context(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
self._handle_dbapi_exception(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2344, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
cursor.execute(statement, parameters)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 153, in execute
result = self._query(query)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 322, in _query
conn.query(q)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 558, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 822, in _read_query_result
result.read()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 1200, in read
first_packet = self.connection._read_packet()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 772, in _read_packet
packet.raise_for_error()
File "D:\ainews\backend\venv\lib\site-packages\pymysql\protocol.py", line 221, in raise_for_error
err.raise_mysql_exception(self._data)
File "D:\ainews\backend\venv\lib\site-packages\pymysql\err.py", line 143, in raise_mysql_exception
raise errorclass(errno, errval)
sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")
[SQL: INSERT INTO news (source_id, title, url, imgurl, brief, content, summary, source, published_at) VALUES (%(source_id)s, %(title)s, %(url)s, %(imgurl)s, %(brief)s, %(content)s, %(summary)s, %(source)s, %(published_at)s)]
[parameters: {'source_id': 'ARTIt3a0TX7dMWrqu7nxSa9p260109', 'title': '新年新场景:机器人“新同事”今日上岗', 'url': 'https://news.cctv.com/2026/01/09/ARTIt3a0TX7dMWrqu7nxSa9p260109.shtml', 'imgurl': 'https://p5.img.cctvpic.com/photoworkspace/2026/01/09/2026010922453830786.jpg', 'brief': '2025年被称为具身智能产业化元年,一大批智能机器人正从实验室走进我们的真实世界,甚至成为我们身边的新同事。', 'content': '央视网消息:2025年被称为具身智能产业化元年,一大批智能机器人正从实验室走进我们的真实世界,甚至成为我们身边的新同事。他们如何在新岗位上大显身手?\n浙江温州,巡特警大队巡逻队伍里,出警巡逻的铁球引来不少市民围观,这不是普通的铁球,而是一个球形机器人。\n这款球形机器人是当地刚刚“入职”的“新警 ... (1695 characters truncated) ... n在真实世界里,机器人正以意想不到的各种形态,悄然融入我们的生活。工程师们不再追求模仿人类的外表,而是让机器人成为特定场景中的“专家”。展望“十五五”,我国正大力推动机器人产业与人工智能等新技术深度融合,加速技术迭代。未来,具身智能将像手机一样,深刻融入我们的生产和生活,开启人机共融的智能新时代。', 'summary': '王酉他们增加了一个执行机构,同时增加一套算法,采用双重系统共同对球形机器人的行驶姿态进行控制,球形机器人的稳定行驶速度,一下提高到13.8米/秒,相当于时速接近50公里。2025年,从运动赛场到生产一线、生活场景,我国的具身智能机器人正以肉眼可见的速度加速迭代', 'source': 'news', 'published_at': '2026-01-09 22:48:37'}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
127.0.0.1 - - [09/Jan/2026 23:57:57] "POST /api/news/crawl_and_analyze HTTP/1.1" 500 -
def crawl_and_analyze():
s = db_session()
try:
rows = fetch_latest()
count_new = 0
body = (request.get_json(silent=True) or {})
fc = bool(body.get('fetch_content', True))
for r in rows:
if s.query(News).filter_by(source_id=r['source_id']).first():
continue
n = News(
source_id=r['source_id'],
title=r['title'],
url=r['url'],
brief=r.get('brief'),
source=r.get('source'),
imgurl=r.get('image'),
published_at=r.get('published_at')
)
if fc:
try:
from .crawler.cctv import fetch_content
n.content = fetch_content(n.url)
except Exception:
n.content = n.content
n.summary = generate_summary((n.content or "") + (n.brief or ""))
s.add(n)
count_new += 1
s.commit()明明有 if s.query(News).filter_by(source_id=r['source_id']).first():
continue检查去重,为什么在
s.commit()报错了
这个问题是典型的**并发竞争条件(Race Condition)**导致的。让我解释一下为什么会发生:
问题原因
查询和插入之间有时间差:
# 时刻1:查询时,数据库中还没有这条记录 if s.query(News).filter_by(source_id=r['source_id']).first(): continue # 没查到,继续执行 # 时刻2:准备插入 s.add(n) # 时刻3:提交时,发现另一个请求已经插入了相同的source_id s.commit() # 💥 IntegrityError!可能的场景:
- 同时触发了两次爬取请求
- 上一次的事务还没提交完成
- 两个会话都通过了重复检查,然后都尝试插入
解决方案
方案 1:捕获异常并跳过(推荐)
def crawl_and_analyze():
s = db_session()
try:
rows = fetch_latest()
count_new = 0
body = (request.get_json(silent=True) or {})
fc = bool(body.get('fetch_content', True))
for r in rows:
# 先检查是否存在
if s.query(News).filter_by(source_id=r['source_id']).first():
continue
n = News(
source_id=r['source_id'],
title=r['title'],
url=r['url'],
brief=r.get('brief'),
source=r.get('source'),
imgurl=r.get('image'),
published_at=r.get('published_at')
)
if fc:
try:
from .crawler.cctv import fetch_content
n.content = fetch_content(n.url)
except Exception:
pass # 保持原有的 content
n.summary = generate_summary((n.content or "") + (n.brief or ""))
s.add(n)
# 🔧 每条记录单独提交,并捕获重复异常
try:
s.commit()
count_new += 1
except IntegrityError as e:
s.rollback() # 回滚这条记录
print(f"Skipping duplicate: {r['source_id']}")
continue
return jsonify({'status': 'success', 'new_count': count_new}), 200
except Exception as e:
s.rollback()
return jsonify({'status': 'error', 'message': str(e)}), 500
finally:
s.close()
方案 2:批量提交时捕获异常
from sqlalchemy.exc import IntegrityError
def crawl_and_analyze():
s = db_session()
try:
rows = fetch_latest()
count_new = 0
body = (request.get_json(silent=True) or {})
fc = bool(body.get('fetch_content', True))
for r in rows:
if s.query(News).filter_by(source_id=r['source_id']).first():
continue
n = News(
source_id=r['source_id'],
title=r['title'],
url=r['url'],
brief=r.get('brief'),
source=r.get('source'),
imgurl=r.get('image'),
published_at=r.get('published_at')
)
if fc:
try:
from .crawler.cctv import fetch_content
n.content = fetch_content(n.url)
except Exception:
pass
n.summary = generate_summary((n.content or "") + (n.brief or ""))
s.add(n)
count_new += 1
# 🔧 捕获批量提交时的重复错误
try:
s.commit()
except IntegrityError as e:
s.rollback()
# 如果批量提交失败,改为逐条提交
print("Batch commit failed, trying one by one...")
count_new = 0
for r in rows:
if s.query(News).filter_by(source_id=r['source_id']).first():
continue
n = News(
source_id=r['source_id'],
title=r['title'],
url=r['url'],
brief=r.get('brief'),
source=r.get('source'),
imgurl=r.get('image'),
published_at=r.get('published_at')
)
if fc:
try:
from .crawler.cctv import fetch_content
n.content = fetch_content(n.url)
except Exception:
pass
n.summary = generate_summary((n.content or "") + (n.brief or ""))
s.add(n)
try:
s.commit()
count_new += 1
except IntegrityError:
s.rollback()
continue
return jsonify({'status': 'success', 'new_count': count_new}), 200
except Exception as e:
s.rollback()
return jsonify({'status': 'error', 'message': str(e)}), 500
finally:
s.close()
方案 3:使用数据库锁(最安全但性能较低)
from sqlalchemy import select
from sqlalchemy.orm import Session
def crawl_and_analyze():
s = db_session()
try:
rows = fetch_latest()
count_new = 0
body = (request.get_json(silent=True) or {})
fc = bool(body.get('fetch_content', True))
for r in rows:
# 🔧 使用 with_for_update 加锁查询
existing = s.query(News).filter_by(
source_id=r['source_id']
).with_for_update().first()
if existing:
continue
n = News(
source_id=r['source_id'],
title=r['title'],
url=r['url'],
brief=r.get('brief'),
source=r.get('source'),
imgurl=r.get('image'),
published_at=r.get('published_at')
)
if fc:
try:
from .crawler.cctv import fetch_content
n.content = fetch_content(n.url)
except Exception:
pass
n.summary = generate_summary((n.content or "") + (n.brief or ""))
s.add(n)
s.commit() # 立即提交释放锁
count_new += 1
return jsonify({'status': 'success', 'new_count': count_new}), 200
except Exception as e:
s.rollback()
return jsonify({'status': 'error', 'message': str(e)}), 500
finally:
s.close()
推荐方案
我推荐使用方案 1,因为:
- ✅ 简单直接,易于理解
- ✅ 性能好,不会阻塞其他请求
- ✅ 优雅处理并发问题
- ✅ 每条记录独立提交,一条失败不影响其他
记得在文件顶部添加导入:
from sqlalchemy.exc import IntegrityError
这样就能彻底解决重复插入的问题了!