首页技术专题博客目录我的收藏关于与联系

批量处理 1000 万条数据,我踩过的那些坑

上个月接了个需求,要处理 1000 万条用户数据,做数据清洗和转换。听起来简单,做起来才发现到处都是坑。内存溢出、超时、数据不一致,各种问题都遇到了。这篇文章记录下整个过程,以及我是怎么解决的。

需求

需求很简单:从数据库里读取 1000 万条用户数据,做数据清洗和转换,然后写入新表。

我一开始想,这有什么难的?写个脚本,循环读取,处理,写入,不就完了?

结果,我太天真了。

第一次尝试:一次性读取

我写了个脚本,一次性从数据库读取所有数据:

data = db.query("SELECT * FROM users") for row in data: process(row) db.insert("INSERT INTO new_users ...")

运行之后,内存直接爆了。1000 万条数据,每条数据大概 1KB,就是 10GB。我的服务器只有 8GB 内存,根本装不下。

而且,即使内存够,一次性读取也会导致数据库连接超时。数据库连接有超时限制,如果查询时间太长,连接会被断开。

第二次尝试:分批读取

我改成分批读取,每次读取 1000 条:

offset = 0 batch_size = 1000 while True: data = db.query(f"SELECT * FROM users LIMIT {batch_size} OFFSET {offset}") if not data: break for row in data: process(row) db.insert("INSERT INTO new_users ...") offset += batch_size

这次内存没问题了,但速度太慢。1000 万条数据,每次处理 1000 条,就是 1 万次查询。每次查询大概 100ms,就是 1000 秒,差不多 17 分钟。

而且,如果中间出错了,前面的数据已经处理了,后面的数据还没处理,数据就不一致了。

第三次尝试:使用游标

我改成使用游标,逐条读取:

cursor = db.cursor() cursor.execute("SELECT * FROM users") while True: row = cursor.fetchone() if not row: break process(row) db.insert("INSERT INTO new_users ...")

这次速度好一点,但还是慢。而且,如果中间出错了,游标会断开,数据就丢失了。

第四次尝试:批量插入

我改成批量插入,每次插入 1000 条:

cursor = db.cursor() cursor.execute("SELECT * FROM users") batch = [] while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = []

这次速度好多了。批量插入比逐条插入快很多,因为减少了数据库的往返次数。

但还有一个问题:如果中间出错了,前面的数据已经插入了,后面的数据还没插入,数据就不一致了。

第五次尝试:使用事务

我改成使用事务,确保数据一致性:

cursor = db.cursor() cursor.execute("SELECT * FROM users") batch = [] db.begin_transaction() try: while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = [] db.commit() except Exception as e: db.rollback() raise e

这次数据一致性没问题了,但如果数据量太大,事务会占用太多资源,导致数据库锁表。

最终方案:分批处理 + 事务 + 断点续传

我最终采用的方案是:分批处理,每批使用事务,支持断点续传。

def process_batch(start_id, end_id): cursor = db.cursor() cursor.execute(f"SELECT * FROM users WHERE id >= {start_id} AND id < {end_id}") batch = [] db.begin_transaction() try: while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = [] db.commit() return end_id except Exception as e: db.rollback() raise e # 主循环 current_id = 0 batch_size = 100000 while current_id < total_count: try: current_id = process_batch(current_id, current_id + batch_size) save_checkpoint(current_id) # 保存断点 except Exception as e: log_error(e) # 从断点继续 current_id = load_checkpoint() continue

这个方案的优点:

  • 内存占用小:每次只处理一批数据,不会占用太多内存
  • 速度快:批量插入,减少数据库往返次数
  • 数据一致:每批使用事务,确保数据一致性
  • 可恢复:支持断点续传,出错后可以从断点继续

其他优化

1. 并行处理

如果数据之间没有依赖关系,可以使用并行处理。但要注意数据库连接数限制。

2. 索引优化

确保查询字段有索引,可以大幅提升查询速度。

3. 连接池

使用连接池,避免频繁创建和关闭数据库连接。

4. 监控和日志

添加监控和日志,方便排查问题。记录处理进度、错误信息、性能指标等。

总结

处理大量数据,不是简单的循环读取和写入。需要考虑内存、性能、一致性、可恢复性等多个方面。

关键点:

  • 分批处理:避免一次性加载所有数据
  • 批量操作:减少数据库往返次数
  • 事务保证:确保数据一致性
  • 断点续传:支持错误恢复
  • 监控日志:方便排查问题

如果数据量真的很大,可以考虑使用专门的数据处理工具,比如 Spark、Flink 等。但对于大多数场景,上面的方案就够用了。

评论区