sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")

D:\ainews\backend\venv\Scripts\python.exe -m backend.app 
 * Serving Flask app 'app'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.43.53:5000
Press CTRL+C to quit
Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\lenovo\AppData\Local\Temp\jieba.cache
Loading model cost 1.007 seconds.
Prefix dict has been built successfully.
[2026-01-09 23:57:57,046] ERROR in app: Exception on /api/news/crawl_and_analyze [POST]
Traceback (most recent call last):
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 153, in execute
    result = self._query(query)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 322, in _query
    conn.query(q)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 558, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 822, in _read_query_result
    result.read()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 1200, in read
    first_packet = self.connection._read_packet()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 772, in _read_packet
    packet.raise_for_error()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\protocol.py", line 221, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\err.py", line 143, in raise_mysql_exception
    raise errorclass(errno, errval)
pymysql.err.IntegrityError: (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 2190, in wsgi_app
    response = self.full_dispatch_request()
  File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1486, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "D:\ainews\backend\venv\lib\site-packages\flask_cors\extension.py", line 165, in wrapped_function
    return cors_after_request(app.make_response(f(*args, **kwargs)))
  File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1484, in full_dispatch_request
    rv = self.dispatch_request()
  File "D:\ainews\backend\venv\lib\site-packages\flask\app.py", line 1469, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "D:\ainews\backend\app.py", line 176, in crawl_and_analyze
    s.commit()
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
    trans.commit(_to_root=True)
  File "<string>", line 2, in commit
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
    self._prepare_impl()
  File "<string>", line 2, in _prepare_impl
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
    self.session.flush()
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
    self._flush(objects)
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
    with util.safe_reraise():
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
    flush_context.execute()
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
    rec.execute(self)
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
    util.preloaded.orm_persistence.save_obj(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
    _emit_insert_statements(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1227, in _emit_insert_statements
    result = connection.execute(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
    return meth(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 517, in _execute_on_connection
    return connection._execute_clauseelement(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
    ret = self._execute_context(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
    return self._exec_single_context(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
    self._handle_dbapi_exception(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2344, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "D:\ainews\backend\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 153, in execute
    result = self._query(query)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\cursors.py", line 322, in _query
    conn.query(q)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 558, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 822, in _read_query_result
    result.read()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 1200, in read
    first_packet = self.connection._read_packet()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\connections.py", line 772, in _read_packet
    packet.raise_for_error()
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\protocol.py", line 221, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "D:\ainews\backend\venv\lib\site-packages\pymysql\err.py", line 143, in raise_mysql_exception
    raise errorclass(errno, errval)
sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'ARTIt3a0TX7dMWrqu7nxSa9p260109' for key 'uix_source_id'")
[SQL: INSERT INTO news (source_id, title, url, imgurl, brief, content, summary, source, published_at) VALUES (%(source_id)s, %(title)s, %(url)s, %(imgurl)s, %(brief)s, %(content)s, %(summary)s, %(source)s, %(published_at)s)]
[parameters: {'source_id': 'ARTIt3a0TX7dMWrqu7nxSa9p260109', 'title': '新年新场景:机器人“新同事”今日上岗', 'url': 'https://news.cctv.com/2026/01/09/ARTIt3a0TX7dMWrqu7nxSa9p260109.shtml', 'imgurl': 'https://p5.img.cctvpic.com/photoworkspace/2026/01/09/2026010922453830786.jpg', 'brief': '2025年被称为具身智能产业化元年,一大批智能机器人正从实验室走进我们的真实世界,甚至成为我们身边的新同事。', 'content': '央视网消息:2025年被称为具身智能产业化元年,一大批智能机器人正从实验室走进我们的真实世界,甚至成为我们身边的新同事。他们如何在新岗位上大显身手?\n浙江温州,巡特警大队巡逻队伍里,出警巡逻的铁球引来不少市民围观,这不是普通的铁球,而是一个球形机器人。\n这款球形机器人是当地刚刚“入职”的“新警 ... (1695 characters truncated) ... n在真实世界里,机器人正以意想不到的各种形态,悄然融入我们的生活。工程师们不再追求模仿人类的外表,而是让机器人成为特定场景中的“专家”。展望“十五五”,我国正大力推动机器人产业与人工智能等新技术深度融合,加速技术迭代。未来,具身智能将像手机一样,深刻融入我们的生产和生活,开启人机共融的智能新时代。', 'summary': '王酉他们增加了一个执行机构,同时增加一套算法,采用双重系统共同对球形机器人的行驶姿态进行控制,球形机器人的稳定行驶速度,一下提高到13.8米/秒,相当于时速接近50公里。2025年,从运动赛场到生产一线、生活场景,我国的具身智能机器人正以肉眼可见的速度加速迭代', 'source': 'news', 'published_at': '2026-01-09 22:48:37'}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
127.0.0.1 - - [09/Jan/2026 23:57:57] "POST /api/news/crawl_and_analyze HTTP/1.1" 500 -
def crawl_and_analyze():
    s = db_session()
    try:
        rows = fetch_latest()
        count_new = 0
        body = (request.get_json(silent=True) or {})
        fc = bool(body.get('fetch_content', True))
        for r in rows:
            if s.query(News).filter_by(source_id=r['source_id']).first():
                continue
            n = News(
                source_id=r['source_id'],
                title=r['title'],
                url=r['url'],
                brief=r.get('brief'),
                source=r.get('source'),
                imgurl=r.get('image'),
                published_at=r.get('published_at')
            )
            if fc:
                try:
                    from .crawler.cctv import fetch_content
                    n.content = fetch_content(n.url)
                except Exception:
                    n.content = n.content
            n.summary = generate_summary((n.content or "") + (n.brief or ""))
            s.add(n)
            count_new += 1
        s.commit()明明有   if s.query(News).filter_by(source_id=r['source_id']).first():
                continue检查去重,为什么在        
        s.commit()报错了

这个问题是典型的**并发竞争条件(Race Condition)**导致的。让我解释一下为什么会发生:

问题原因

  1. 查询和插入之间有时间差

    # 时刻1:查询时,数据库中还没有这条记录
    if s.query(News).filter_by(source_id=r['source_id']).first():
        continue  # 没查到,继续执行
    
    # 时刻2:准备插入
    s.add(n)
    
    # 时刻3:提交时,发现另一个请求已经插入了相同的source_id
    s.commit()  # 💥 IntegrityError!
    
  2. 可能的场景

    • 同时触发了两次爬取请求
    • 上一次的事务还没提交完成
    • 两个会话都通过了重复检查,然后都尝试插入

解决方案

方案 1:捕获异常并跳过(推荐)

def crawl_and_analyze():
    s = db_session()
    try:
        rows = fetch_latest()
        count_new = 0
        body = (request.get_json(silent=True) or {})
        fc = bool(body.get('fetch_content', True))
        
        for r in rows:
            # 先检查是否存在
            if s.query(News).filter_by(source_id=r['source_id']).first():
                continue
            
            n = News(
                source_id=r['source_id'],
                title=r['title'],
                url=r['url'],
                brief=r.get('brief'),
                source=r.get('source'),
                imgurl=r.get('image'),
                published_at=r.get('published_at')
            )
            
            if fc:
                try:
                    from .crawler.cctv import fetch_content
                    n.content = fetch_content(n.url)
                except Exception:
                    pass  # 保持原有的 content
            
            n.summary = generate_summary((n.content or "") + (n.brief or ""))
            s.add(n)
            
            # 🔧 每条记录单独提交,并捕获重复异常
            try:
                s.commit()
                count_new += 1
            except IntegrityError as e:
                s.rollback()  # 回滚这条记录
                print(f"Skipping duplicate: {r['source_id']}")
                continue
        
        return jsonify({'status': 'success', 'new_count': count_new}), 200
        
    except Exception as e:
        s.rollback()
        return jsonify({'status': 'error', 'message': str(e)}), 500
    finally:
        s.close()

方案 2:批量提交时捕获异常

from sqlalchemy.exc import IntegrityError

def crawl_and_analyze():
    s = db_session()
    try:
        rows = fetch_latest()
        count_new = 0
        body = (request.get_json(silent=True) or {})
        fc = bool(body.get('fetch_content', True))
        
        for r in rows:
            if s.query(News).filter_by(source_id=r['source_id']).first():
                continue
            
            n = News(
                source_id=r['source_id'],
                title=r['title'],
                url=r['url'],
                brief=r.get('brief'),
                source=r.get('source'),
                imgurl=r.get('image'),
                published_at=r.get('published_at')
            )
            
            if fc:
                try:
                    from .crawler.cctv import fetch_content
                    n.content = fetch_content(n.url)
                except Exception:
                    pass
            
            n.summary = generate_summary((n.content or "") + (n.brief or ""))
            s.add(n)
            count_new += 1
        
        # 🔧 捕获批量提交时的重复错误
        try:
            s.commit()
        except IntegrityError as e:
            s.rollback()
            # 如果批量提交失败,改为逐条提交
            print("Batch commit failed, trying one by one...")
            count_new = 0
            for r in rows:
                if s.query(News).filter_by(source_id=r['source_id']).first():
                    continue
                
                n = News(
                    source_id=r['source_id'],
                    title=r['title'],
                    url=r['url'],
                    brief=r.get('brief'),
                    source=r.get('source'),
                    imgurl=r.get('image'),
                    published_at=r.get('published_at')
                )
                
                if fc:
                    try:
                        from .crawler.cctv import fetch_content
                        n.content = fetch_content(n.url)
                    except Exception:
                        pass
                
                n.summary = generate_summary((n.content or "") + (n.brief or ""))
                s.add(n)
                
                try:
                    s.commit()
                    count_new += 1
                except IntegrityError:
                    s.rollback()
                    continue
        
        return jsonify({'status': 'success', 'new_count': count_new}), 200
        
    except Exception as e:
        s.rollback()
        return jsonify({'status': 'error', 'message': str(e)}), 500
    finally:
        s.close()

方案 3:使用数据库锁(最安全但性能较低)

from sqlalchemy import select
from sqlalchemy.orm import Session

def crawl_and_analyze():
    s = db_session()
    try:
        rows = fetch_latest()
        count_new = 0
        body = (request.get_json(silent=True) or {})
        fc = bool(body.get('fetch_content', True))
        
        for r in rows:
            # 🔧 使用 with_for_update 加锁查询
            existing = s.query(News).filter_by(
                source_id=r['source_id']
            ).with_for_update().first()
            
            if existing:
                continue
            
            n = News(
                source_id=r['source_id'],
                title=r['title'],
                url=r['url'],
                brief=r.get('brief'),
                source=r.get('source'),
                imgurl=r.get('image'),
                published_at=r.get('published_at')
            )
            
            if fc:
                try:
                    from .crawler.cctv import fetch_content
                    n.content = fetch_content(n.url)
                except Exception:
                    pass
            
            n.summary = generate_summary((n.content or "") + (n.brief or ""))
            s.add(n)
            s.commit()  # 立即提交释放锁
            count_new += 1
        
        return jsonify({'status': 'success', 'new_count': count_new}), 200
        
    except Exception as e:
        s.rollback()
        return jsonify({'status': 'error', 'message': str(e)}), 500
    finally:
        s.close()

推荐方案

我推荐使用方案 1,因为:

  • ✅ 简单直接,易于理解
  • ✅ 性能好,不会阻塞其他请求
  • ✅ 优雅处理并发问题
  • ✅ 每条记录独立提交,一条失败不影响其他

记得在文件顶部添加导入:

from sqlalchemy.exc import IntegrityError

这样就能彻底解决重复插入的问题了!