需求
需求很简单:从数据库里读取 1000 万条用户数据,做数据清洗和转换,然后写入新表。
我一开始想,这有什么难的?写个脚本,循环读取,处理,写入,不就完了?
结果,我太天真了。
第一次尝试:一次性读取
我写了个脚本,一次性从数据库读取所有数据:
data = db.query("SELECT * FROM users") for row in data: process(row) db.insert("INSERT INTO new_users ...")运行之后,内存直接爆了。1000 万条数据,每条数据大概 1KB,就是 10GB。我的服务器只有 8GB 内存,根本装不下。
而且,即使内存够,一次性读取也会导致数据库连接超时。数据库连接有超时限制,如果查询时间太长,连接会被断开。
第二次尝试:分批读取
我改成分批读取,每次读取 1000 条:
offset = 0 batch_size = 1000 while True: data = db.query(f"SELECT * FROM users LIMIT {batch_size} OFFSET {offset}") if not data: break for row in data: process(row) db.insert("INSERT INTO new_users ...") offset += batch_size这次内存没问题了,但速度太慢。1000 万条数据,每次处理 1000 条,就是 1 万次查询。每次查询大概 100ms,就是 1000 秒,差不多 17 分钟。
而且,如果中间出错了,前面的数据已经处理了,后面的数据还没处理,数据就不一致了。
第三次尝试:使用游标
我改成使用游标,逐条读取:
cursor = db.cursor() cursor.execute("SELECT * FROM users") while True: row = cursor.fetchone() if not row: break process(row) db.insert("INSERT INTO new_users ...")这次速度好一点,但还是慢。而且,如果中间出错了,游标会断开,数据就丢失了。
第四次尝试:批量插入
我改成批量插入,每次插入 1000 条:
cursor = db.cursor() cursor.execute("SELECT * FROM users") batch = [] while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = []这次速度好多了。批量插入比逐条插入快很多,因为减少了数据库的往返次数。
但还有一个问题:如果中间出错了,前面的数据已经插入了,后面的数据还没插入,数据就不一致了。
第五次尝试:使用事务
我改成使用事务,确保数据一致性:
cursor = db.cursor() cursor.execute("SELECT * FROM users") batch = [] db.begin_transaction() try: while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = [] db.commit() except Exception as e: db.rollback() raise e这次数据一致性没问题了,但如果数据量太大,事务会占用太多资源,导致数据库锁表。
最终方案:分批处理 + 事务 + 断点续传
我最终采用的方案是:分批处理,每批使用事务,支持断点续传。
def process_batch(start_id, end_id): cursor = db.cursor() cursor.execute(f"SELECT * FROM users WHERE id >= {start_id} AND id < {end_id}") batch = [] db.begin_transaction() try: while True: row = cursor.fetchone() if not row: break processed = process(row) batch.append(processed) if len(batch) >= 1000: db.bulk_insert("new_users", batch) batch = [] db.commit() return end_id except Exception as e: db.rollback() raise e # 主循环 current_id = 0 batch_size = 100000 while current_id < total_count: try: current_id = process_batch(current_id, current_id + batch_size) save_checkpoint(current_id) # 保存断点 except Exception as e: log_error(e) # 从断点继续 current_id = load_checkpoint() continue这个方案的优点:
- 内存占用小:每次只处理一批数据,不会占用太多内存
- 速度快:批量插入,减少数据库往返次数
- 数据一致:每批使用事务,确保数据一致性
- 可恢复:支持断点续传,出错后可以从断点继续
其他优化
1. 并行处理
如果数据之间没有依赖关系,可以使用并行处理。但要注意数据库连接数限制。
2. 索引优化
确保查询字段有索引,可以大幅提升查询速度。
3. 连接池
使用连接池,避免频繁创建和关闭数据库连接。
4. 监控和日志
添加监控和日志,方便排查问题。记录处理进度、错误信息、性能指标等。
总结
处理大量数据,不是简单的循环读取和写入。需要考虑内存、性能、一致性、可恢复性等多个方面。
关键点:
- 分批处理:避免一次性加载所有数据
- 批量操作:减少数据库往返次数
- 事务保证:确保数据一致性
- 断点续传:支持错误恢复
- 监控日志:方便排查问题
如果数据量真的很大,可以考虑使用专门的数据处理工具,比如 Spark、Flink 等。但对于大多数场景,上面的方案就够用了。
评论区