V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
drymonfidelia
V2EX  ›  程序员

要对单个 6.20TB 的超大 csv 文件保持顺序的情况下进行去除重复行,有什么好思路?显然不可能加载进内存

  •  
  •   drymonfidelia · 2024-06-01 22:14:20 +08:00 · 14681 次点击
    这是一个创建于 372 天前的主题,其中的信息可能已经有所发展或是发生改变。
    101 条回复    2024-06-08 02:13:43 +08:00
    1  2  
    NXzCH8fP20468ML5
        1
    NXzCH8fP20468ML5  
       2024-06-01 22:21:11 +08:00 via Android
    duckdb 值得拥有
    dcsuibian
        2
    dcsuibian  
       2024-06-01 22:24:03 +08:00
    扔数据库不行吗?
    opengps
        3
    opengps  
       2024-06-01 22:30:03 +08:00
    能想到的只有数据库
    buaasoftdavid
        4
    buaasoftdavid  
       2024-06-01 22:30:13 +08:00
    内存里搞个哈希表,一行一行读 csv ,哈希表碰撞了就扔掉该行,没碰撞就插入哈希表再写到磁盘
    52boobs
        5
    52boobs  
       2024-06-01 22:33:22 +08:00 via Android
    表的结构是怎样的,有天然的主键吗
    kneo
        6
    kneo  
       2024-06-01 22:36:13 +08:00 via Android   ❤️ 1
    行数是多少?平均行长是多少?
    去重是应该基于整行文本还是列内容?比如 1.0 和 1 是否应该算做重复?
    每行前缀重复度是否够高?是否有某列( XXID )可以用于快速去重?
    机器性能如何?内存有多大?
    securityCoding
        7
    securityCoding  
       2024-06-01 22:38:58 +08:00 via Android
    spark 干的活?
    drymonfidelia
        8
    drymonfidelia  
    OP
       2024-06-01 22:40:30 +08:00
    @kneo 行数是 203 亿,平均行长 335
    去重是基于整行文本
    前缀重复度不高,没有 ID
    最高可以弄到 256GB 内存的服务器
    phrack
        9
    phrack  
       2024-06-01 22:43:02 +08:00 via iPhone
    光就这点信息说个屁呢,一行 8 个字符,是几千亿行,一行 1M 字符,是几百万行,这能一样吗?

    内存也不说,4KB 内存和 4GB 内存能一样吗?
    drymonfidelia
        10
    drymonfidelia  
    OP
       2024-06-01 22:44:17 +08:00
    @phrack 8 楼补充了
    rrfeng
        11
    rrfeng  
       2024-06-01 22:44:55 +08:00
    去重后大概会有多少行知道吗
    lifanxi
        12
    lifanxi  
       2024-06-01 22:48:21 +08:00   ❤️ 2
    遍历整个文件,每行都 hash 一下,把 hash 存到一个高性能 KV 里,不重复就输出当前行,重复就跳过当前行。
    drymonfidelia
        13
    drymonfidelia  
    OP
       2024-06-01 22:48:21 +08:00
    @rrfeng 不知道
    cndenis
        14
    cndenis  
       2024-06-01 22:49:20 +08:00
    如果不是要求严格不能丢数据的话, 可以用布隆过滤器去重, 误判率有公式可以算的, 有几十 GB 级别内存的话, 误判率应该比较低的
    phrack
        15
    phrack  
       2024-06-01 22:51:26 +08:00 via iPhone
    开启 zram ,256g 可以当作 512g 没问题。

    sha1sum 一个占用 20 字节,200 亿差不多占用 372g ,没问题。

    极低概率去掉非重复行,几乎可以忽略。
    rrfeng
        16
    rrfeng  
       2024-06-01 22:54:00 +08:00
    那就 bloomfilter 先过一遍看看情况。

    要硬算的话,就分块排序,排完序就好处理了。排序前记录下序号,最后还原一下顺序。
    hbcolorful
        17
    hbcolorful  
       2024-06-01 22:54:24 +08:00
    redis 的布隆过滤器可以考虑下
    NXzCH8fP20468ML5
        18
    NXzCH8fP20468ML5  
       2024-06-01 22:54:41 +08:00 via Android
    @drymonfidelia 看错了,还以为是 6GB 的 csv 文件在线处理呢,那确实不适合 duckdb 。

    还是上 spark 吧,硬盘配大点就行。

    203 亿行 csv 有那么大吗,我们每天备份全量的 17 亿行信息,保留几十天,用 orc 存储,也就几百 G 。
    kneo
        19
    kneo  
       2024-06-01 22:58:02 +08:00 via Android
    感觉可以试试 clickhouse 。
    yinmin
        20
    yinmin  
       2024-06-01 22:59:35 +08:00 via iPhone
    使用 apache spark ,用 python 的 PySpark 库试试,具体可以问 gpt-4
    NXzCH8fP20468ML5
        21
    NXzCH8fP20468ML5  
       2024-06-01 23:05:58 +08:00 via Android   ❤️ 1
    1. hash
    2. 加序号
    3. 按照 hash 分区
    4. 逐个处理分区
    5. 分区内排序
    6. 分区外归并排序

    只有单机的话,可以考虑用 duckdb ,多机就用 spark 吧。
    yangxin0
        22
    yangxin0  
       2024-06-01 23:07:23 +08:00
    分治:
    1 、用空间换时间(计算)
    2 、用时间(计算)换空间

    针对( 1 )有 spark 集群很快的,如果预算有限那么方法( 2 ):
    1 、把数据分成 N 块,并针对 N 块内进行去重
    2 、从 n 块中取一块,和剩下的 n-1 块去重,取这一块建立 hash or map 都可以,n-1 按照顺序读取
    3 、从剩下的 n-1 块中又进行步骤( 2 ), 直到 n=0
    4 、经过上述思路处理的 csv 就包含重复
    caola
        23
    caola  
       2024-06-01 23:10:33 +08:00
    直接存入 kvrocks (硬盘版 redis)
    dacapoday
        24
    dacapoday  
       2024-06-01 23:16:41 +08:00
    单文件这么大,文件系统压力也不小吧。多数文件系统单文件也不支持这么大吧
    james122333
        25
    james122333  
       2024-06-01 23:17:30 +08:00 via Android
    sed 有往下查找一样内容行并删除的工具都可以 其它的都要内存或硬盘空间 vim 就差在它开启文件要暂存 不然也可以
    YTMartian
        26
    YTMartian  
       2024-06-01 23:20:19 +08:00
    磁盘够用的话,先外部排序,然后直接读取,忽略与上一条相同的数据就行了吧,随机读取文件指定位置,也不用加载进内存
    dode
        27
    dode  
       2024-06-01 23:21:30 +08:00
    按顺序处理,依据一个合适长度的前缀做分区,逐行文本进行处理,写入到对应分区下面。

    检索特定行文本,是否在对应分区内存在,不存在则写入,存在就返回已存在。
    chen7897499
        28
    chen7897499  
       2024-06-01 23:27:27 +08:00 via Android
    emeditor
    NotLongNil
        30
    NotLongNil  
       2024-06-01 23:33:32 +08:00
    上面有人提到的 Bloom Filter 应该是相对最优的解法了,实现简单,占用内存低,速度也快。唯一的问题就是要选择合适的长度,将错误率降低,这需要一定的算法知识,不过现在可以问 AI 了,让 AI 给出公式
    ClericPy
        31
    ClericPy  
       2024-06-01 23:37:41 +08:00
    有点像 map reduce 场景,归并加快排

    问 AI 是好办法
    msg7086
        32
    msg7086  
       2024-06-02 02:57:38 +08:00   ❤️ 2
    我能想到的两种不同的做法。
    第一种,在内存不足的情况下,放弃掉内存,直接用 SSD 读写。
    在 SSD 上开一个数据库(比如 MySQL 或者 Postgres ),把已经存在的 hash 写到数据库里。
    然后流式扫描每一行,取 hash 比对数据库,如果存在 hash 就跳过,不存在就写到结果集里并添加到数据库。
    要快速稳妥可以用两种不同的 hash ,比如 xxHash 做一次过滤,SHA1 做二次检验。

    第二种,在内存不足的情况下,分批处理。
    多次流式扫描每一行,取 hash ,每次只处理 hash 第一个 hex 字符相同的那些数据。
    第一次只索引和处理 sha1hash[0] == '0',第二次只索引和处理'1',这样可以把内存需求降到 1/16 ,缺点是 hash 计算也会是 16 倍。
    稍微优化一下的话,可以在第一次遍历的时候在数据上追加 sha1hash[0]作为分区标记,这样后面 15 次就不会重复计算,缺点是会每行多一两个字节,而且要多写入一次磁盘。
    esee
        33
    esee  
       2024-06-02 03:18:00 +08:00 via Android
    什么叫保级顺序?比如在一百万的位置和 200 万的位置有重复项,则删除后面重复的那个是么。然后能提供多大的内存和硬盘,
    wxf666
        34
    wxf666  
       2024-06-02 03:52:41 +08:00
    想到个方法,预计耗时:10 小时,准确率:100% 剔除重复行。


    ## 简单流程

    1. 分块排序。
    2. 同时循环每块,删掉非首次出现的重复行。
    3. 分别循环每块,按行号顺序,输出未被删掉的行。


    ## 详细流程

    1. 分块 240GB 文件,每块排序后,写入固态。同时保存每行长度+原始偏移量(约 (240 << 30) / 335 * 8 / 1024 ^ 3 = 5.7 GB )。
    2. 利用小根堆,流式排序(按 <string, offset> 排)所有分块每一行。非首次出现行,保存该行偏移量,到相应块的删除名单里。
    3. 循环每块,排序原始偏移量、删除名单,按序输出(未被删除的)相应行,至最终文件。


    ## 耗时计算

    1. 顺序读写:9 小时( 3 次顺序读,2 次顺序写,假设都为 1GB/s )。
    2. 内存字符串排序:< 1 小时(实测轻薄本 i5-8250U ,每秒归并排序 200W 次 335 长度的随机字符串,约 6900W 次比较)。
    - 多线程快排/归并:`(每块行数 = (240 << 30) / 335) * log2(每块行数) * 块数 = 6017 亿` 次比较,我的轻薄本 8 线程需 0.3 小时。
    - 单线程小根堆:`202e8 * log2(块数 = 6.2 * 1024 / 240 = 26.5) * 2 = 1910 亿` 次比较,需 0.7 小时。
    wxf666
        35
    wxf666  
       2024-06-02 04:13:29 +08:00   ❤️ 1
    34 楼纠正下数据,实测轻薄本 i5-8250U ,1.5 秒归并排序 320W 个 336 长度的随机字符串,约 6500W 次比较。

    - 多线程快排/归并:总计 6017 亿次比较,我的轻薄本 8 线程需 0.5 小时。
    - 单线程小根堆:总计 1910 亿次比较,单线程需 1.2 小时。

    差不太远。。
    wxf666
        36
    wxf666  
       2024-06-02 05:40:34 +08:00
    @dcsuibian #2 ,@opengps #3 ,@msg7086 #32:
    如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?

    @YTMartian #26 ,@dode #27:
    就算固态 4K 随机读写有 10W IOPS ,算下来也要 56 小时吧?

    wxf666
        37
    wxf666  
       2024-06-02 05:42:19 +08:00
    @cndenis #14 ,@hbcolorful #17 ,@NotLongNil #30:

    用布隆过滤,几十 GB 好像不够。
    在线算了下,50 GB + 15 函数,都会有 1 / 25000 概率出错。。
    250 GB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,一个不出错?


    @phrack #15:

    压缩内存,来存 hash ?好像真的可行。。
    平均而言,写入 (372 << 30) / 4096 = 1 亿次,就会占满 372 GB 内存页。即,几乎一开始就会启用 zram ?
    我在别处看了看,lz4 每秒能有 200W 次 IO ,算下来要 2.8 小时即可?
    话说,这个和 Bloom Filter 相比,哪个出错概率小呢?


    dingwen07
        38
    dingwen07  
       2024-06-02 05:56:57 +08:00
    @wxf666
    Bloom filter 的假阳性率是要看哈希函数的数量的吧
    hobochen
        39
    hobochen  
       2024-06-02 06:20:59 +08:00
    大概想了一下,一定深度的前缀树,叶子节点是哈希表或者平衡树存原来的行号应该是一个可行的方案。
    hobochen
        40
    hobochen  
       2024-06-02 06:22:24 +08:00
    更正:叶子节点应当是一个哈希表/平衡树; k 是哈希值,v 是行号
    noqwerty
        41
    noqwerty  
       2024-06-02 09:45:33 +08:00 via iPhone
    可以试试 polars streaming + file sink ? https://www.rhosignal.com/posts/streaming-in-polars/
    dode
        42
    dode  
       2024-06-02 10:06:01 +08:00
    @wxf666 这个是连续 IO 的读写很快,文件在 Linux 系统中读写也可以开启内存缓存
    dode
        43
    dode  
       2024-06-02 10:12:30 +08:00
    @wxf666 固态硬盘随机读写 10 IOPS ,服务器也可以搞几块 U3 固态,存这个计算缓存加速呀
    tonywangcn
        44
    tonywangcn  
       2024-06-02 10:12:52 +08:00
    @wxf666

    你的计算好像不对吧

    n = 20300000000
    p = 0.000000001 (1 in 999925223)
    m = 875595082773 (101.93GiB)
    k = 30

    https://hur.st/bloomfilter/?n=20300000000&p=1.0E-9&m=&k=
    acapla
        45
    acapla  
       2024-06-02 10:16:29 +08:00
    不限时间的话:

    for (i = 1; i < N; i++)
    for (j = 0; j < i; j++)
    if line[i] == line[j]:
    skip line[i]
    HanashirodotETH
        46
    HanashirodotETH  
       2024-06-02 10:51:18 +08:00
    分块 - 用 96 位哈希编号,去重,排序 - 多路归并
    但是也要大概 240G 内存。
    wxf666
        47
    wxf666  
       2024-06-02 10:54:15 +08:00
    @dingwen07 #38 是的。37 楼有提及到,用多少哈希函数。


    @dode #42

    《写入到对应分区下面》这个是缓存尽可能多的文本(如 1GB ),再写入,是吗?
    《检索特定行文本,是否在对应分区内存在》这个是如何做到,顺序读的呢?


    @tonywangcn #44

    平均每十亿条,就误认为一次,某行是重复行,导致丢失该行?
    那你要问 @drymonfidelia 愿不愿意丢失几十行数据了。。
    sunnysab
        48
    sunnysab  
       2024-06-02 10:59:26 +08:00
    @phrack sha1sum 本身如何存储或索引呢?使用 BTree (或类似的树)吗,还是多级 hash 索引?
    wxf666
        49
    wxf666  
       2024-06-02 11:03:24 +08:00
    36 37 楼,好像没 @ 成功。。再试一下。。


    @opengps #3
    @msg7086 #32

    如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?


    @hbcolorful #17
    @NotLongNil #30

    用布隆过滤,几十 GB 好像不够。
    在线算了下,50 GiB + 15 函数,都会有 1 / 26000 概率出错,大约丢失 80W 行数据?
    250 GiB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,不丢失任何数据,也不保留任何重复行?


    hguangzhen
        50
    hguangzhen  
       2024-06-02 11:22:14 +08:00
    惊了~ 竟然没人提到 RocksDB 吗? 本地的文件型 KV 存储库,内置 bloomfilter ,磁盘空间够用,应该很简单的
    mayli
        51
    mayli  
       2024-06-02 11:46:21 +08:00
    如果你只是想最简单的解法(不考虑最高效率或者多机并发)可以试试 sort+uniq

    sort 是可以排序比内存大的文件的: https://vkundeti.blogspot.com/2008/03/tech-algorithmic-details-of-unix-sort.html
    然后排序后的 uniq 是不怎么吃内存

    不过我看有个需求是要保持文件顺序的话,你可以用 uniq --repeated 来找到重复行,如果你重复行不多,那搞个脚本直接过滤一遍源文件就好,也是线性的。
    Kaiv2
        52
    Kaiv2  
       2024-06-02 11:58:04 +08:00
    1. 先计原始文件 a.txt 算每一行 hash 保存到 hash.txt 文件
    2. 复制一份 hash.txt -> hash-2.txt 用于去重计算
    3. 取 hash-2.txt 文件中 10000(这个数根据内存大小预估) 个 hash 前 8 位不重复 hash_array_8
    4. 重复的的写入 hash-4.txt, 剩于的写入 hash-2.1.txt -> hash-2.txt , 循环处理直到 hash-2.txt 没有记录
    ```txt
    let limit = 10000; // 控制内存使用
    let hash_array_8 = [];
    let cache_line = []
    for(let h_line: read_line(hash_2.txt)) {
    if(hash_array_8.size < limit) {
    if(!hash_array_8.has(h_line.sub(8))) {
    hash_array_8.add(h_line.sub(8))
    }
    }
    if(hash_array_8.has(h_line.sub(8))) {
    if(cache_line.has(h_line)) {
    write(hash-4.txt);
    } else {
    cache_line.add(h_line);
    }
    } else {
    write(hash-2.1.txt);
    }
    }
    mv(hash-2.1.txt, hash-2.txt)
    ```
    5. 得到 hash.txt 跟文件一一对应,hash-4.txt 是重复的记录
    6. hash-4.txt (如果重复的不多)直接读取到内存,对应读取 a.txt, hash.txt 每一行,比较 hash 重复跳过,不重复写入 b.txt
    没有考虑过计算量,内存不够可以考虑试试这个办法
    Kaiv2
        53
    Kaiv2  
       2024-06-02 12:59:44 +08:00
    @Kaiv2 写着写着写成了单机的,这么做多此一举,太蠢了。。。应该是 分 hash-3.1 .. n.txt 多个机器同时处理,然后合并重复数据 hash-4.1..n.txt
    drymonfidelia
        54
    drymonfidelia  
    OP
       2024-06-02 13:21:02 +08:00
    @wxf666 不能丢失数据
    @esee 是的。最大可以提供 256GB 内存,硬盘没有限制
    msg7086
        55
    msg7086  
       2024-06-02 13:44:48 +08:00
    @wxf666 #35 上 TB 的数据怎么处理都是会很慢的。(一秒 10w 条数据可能到不了)

    我建议用第三方数据库纯粹是因为这样对实现的要求最低,不需要你搞大内存服务器,不需要自己开发复杂的算法,全部用已知的成熟的方案,你只要插上一堆 SSD 然后干别的事就行了,等个几天数据就都跑完了。算法简单所以要根据需求修改起来也简单,可维护性也好。(用人话说就是,工程师不需要加班,让服务器加班就行。)

    现实当中从 SSD 读取数据到内存也是要花时间的,这么大的量级还要跑前后依赖的操作,我是觉得快不起来。

    (如果能并行 map reduce 倒是能快不少,但这里不太行。)
    lmshl
        56
    lmshl  
       2024-06-02 13:56:56 +08:00   ❤️ 1
    版本答案:RocksDB (与其他 leveldb family 产品)

    解析:203 亿个 bloomfilter 在 p=0.01 下所需的内存空间约为 23.75GB 。实际上,去重所需的空间会少于 203 亿,所以在这个内存空间下,实际 p 值将进一步降低。

    大部分人可能对 bloomfilter 的使用存在误解,他们只考虑在只有 bloomfilter 单一算法存在的前提下来解决需求,这显然是错误的。现代数据库对 bloomfilter 的应用主要是用来降低 miss key 对磁盘 IO 的影响。如果 bloomfilter 认为这个 key 没有出现过,那么这个 key 确实没有出现过。当 bloomfilter 认为它可能出现过,那么出现的概率为 1-p ,此时需要回表二次确认(磁盘 IO )。

    假设一个典型的重复度为 10 倍的 200 亿数据表文件,在这个空间下,p 值会低至 1e-20 。

    那么对这个文件去重,总共会发生 200 亿次内存 bloomfilter 读取,20 亿次 bloomfilter 写入+磁盘顺序写入,以及 180 亿次磁盘随机读取。(考虑到数据库对磁盘的批量写入优化,sstable/memtable 这个数值将会被巨幅降低)

    假设一个重复度为 0.1 倍的 200 亿数据表文件,在这个空间下,p 值变化不大。

    那么对这个文件去重,总共发生 200 亿次内存 bloomfilter 读取,180 亿次 bloomfilter 写入+磁盘顺序写入,以及 20 亿次磁盘随机读取。(同上)

    根据网上其他人做的吞吐量测试,rocksdb 在现代硬件条件下可以稳定达到 10k*rows/s 以上的写入性能,或>1GB/s 的写入吞吐量。乐观地估计,6.2TB 的文件应该能在 2 小时到 2 天左右完成去重。
    cabbage
        57
    cabbage  
       2024-06-02 14:11:28 +08:00 via Android
    @wxf666 布隆过滤器单独用确实不行,免不了假阳性,即便事后检查那也逃不了随机读,不是稳定的方法。但如果纯粹作为首次顺序读的过滤器来用,应该还不错,可以降低一些输入数据量。

    to 34 楼:其实没看懂第二步的堆排序怎么回事,是说在一次排序中对所有行进行排序并去重吗?如果这样的话,那比较的时候是不是还需要在内存里保留所有原始 string ?来讨教下,乐意的话可否点明一二?
    harmless
        58
    harmless  
       2024-06-02 14:25:26 +08:00 via iPhone
    203 亿行全部计算成 hash ,再加上行号,大概 700 多 G
    1. 遍历每行,计算出 hash ,按 hash 第一位将 hash 和行号写入不同的文件,例如 hash 第一位为 0 ,则写入 hash_0.txt ,这样一共会有 16 个文件,每个文件大概 40 多 G
    2. 分别对每个文件按 hash 排序,找出重复的需要删除的行号,记录到文件中
    3. 遍历原始文件,删除需要删除的行
    lrjia
        59
    lrjia  
       2024-06-02 14:29:38 +08:00
    先 hash ,按照 hash 前缀分块成多个文件,使分块后单块的大小可以放入内存。再对每块使用 hash 表去重。最后合并多个文件,用归并排序的做法。这中间应该都是文件的顺序读写。
    cloudzhou
        60
    cloudzhou  
       2024-06-02 15:26:32 +08:00   ❤️ 1
    布隆过滤器,申请硬盘 db 空间作为布隆过滤器存储,按位标记,只要空间足够大,冲突就很小
    在布隆过滤器冲突情况下,将冲突部分,存入到其他人提到的某种 kv db ,然后排除重复处理
    wxf666
        61
    wxf666  
       2024-06-02 15:33:02 +08:00   ❤️ 1
    @phrack #15

    突然想起来,zram 可能行不通。。

    sha1 的结果,是不是相当于随机数据?随机数据,压缩不了啥吧。。

    即,256 GB 内存,当不了 512 GB 用?全都用来存压缩比 100% 后的数据了?


    @cabbage #57

    需要保留 27 行原始 string ,在小根堆里。

    第二步目的:检查出各块中,偏移量非最小的重复行,记录进删除名单中。

    第二步时,已经有 26.5 个 240GB 的、排序好的块。

    参考多路归并,可以流式构造出有序的 (string, offset, chunk_index)。

    当 string 与上一个不同时,说明碰到偏移量最小的新行了(即,全文首次出现)。

    当 string 与上一个相同时,说明重复行了,此时往 "to_del_${chunk_index}.txt" 里记录 offset 。

    (可以攒多点再写,反正只用了 27 个字符串 + 27 * 1GB 缓冲区,还剩 200+GB 内存呢。。)

    以前写过类似的,10+ MB 内存去重 13 GB 文件,里面也有用到多路归并:/t/1031275#reply15
    conan257
        62
    conan257  
       2024-06-02 17:01:07 +08:00
    学习下
    haha1903
        63
    haha1903  
       2024-06-02 19:34:18 +08:00
    hyperloglog
    peterxulove
        64
    peterxulove  
       2024-06-02 20:52:53 +08:00
    对每一行进行哈希,哈希后的哈希值每个位置的值为一个叶子结点建立二叉树,哈希值的位数就是二叉树的层级,判断的时候遍历二叉树即可。
    XuYijie
        65
    XuYijie  
       2024-06-02 20:58:50 +08:00 via Android
    把数据读到数据库,然后分批处理,或者使用 easyExcel 分批读取,处理后把处理后的数据导出到一个新的 csv
    winiex
        66
    winiex  
       2024-06-02 21:06:02 +08:00
    以行顺序读内容,记录一个每行的 hash 值,发生碰撞抛弃当前行
    IwfWcf
        67
    IwfWcf  
       2024-06-02 21:19:43 +08:00
    如果只是要去重那分块排序就好了,如果要保留原有顺序的话那前面有人提到的 bloom filter 命中的情况下再去数据库查询确认应该是最优方案了
    qdwang
        68
    qdwang  
       2024-06-02 21:21:51 +08:00
    203 亿行,每一行做 BLAKE3 ,按照 bit 为 1 的和,来进行分类,共分 256 类。

    平均每类大小 20300000000 * 256 / 8 / 1024 / 1024 / 1024 / 256 = 2.36G

    存成 256 个分类文件,根据你可用内存大小,载入对应数量的分类在内存里作为 cache 。未命中,就随机内存里去除一个,替换成新的。也很容易做成分布式,每台机器管理一定数量的分类。

    由于 BLAKE3 目标是 128bits 安全,所以你可以缩小到 128bit 使用,256g 内存电脑可以装得下 210 个分类在 cache 里。
    qdwang
        69
    qdwang  
       2024-06-02 21:23:41 +08:00
    说错了,BLAKE3 使用 128bit 后,每个分类仍然是 2.36g ,但是分类数量减小到 128 个。
    vivisidea
        70
    vivisidea  
       2024-06-02 21:36:37 +08:00
    1. 计算每一行的 hash , 保存到文件里 hashes.txt
    2. 对 hashes.txt 进行外部排序,外部排序对内存要求低,压力给到磁盘
    3. 遍历 hashes.txt 找出重复,因为已经有序,所以简单对比当前行和上一行即可知道是否重复
    Hawthorne
        71
    Hawthorne  
       2024-06-02 21:44:06 +08:00
    不能用内存映射吗?
    psyer
        72
    psyer  
       2024-06-02 21:47:57 +08:00 via Android
    @XuYijie pandas 的 easyExcle?会不会非常慢…
    psyer
        73
    psyer  
       2024-06-02 21:49:02 +08:00 via Android
    pyspark 效果如何?😁
    qinrui
        74
    qinrui  
       2024-06-02 21:57:44 +08:00
    emeditor 直接打开处理
    cocalrush
        75
    cocalrush  
       2024-06-02 22:48:23 +08:00
    直接用阿里云的 odps 生成个离线表跑下是不是就行了...
    Keuin
        76
    Keuin  
       2024-06-02 23:46:43 +08:00   ❤️ 1
    awk 每行尾追加逗号和行号,整个文件每个行都追加一下,占 6.2T
    unix sort 工具外排序,直接按字母表排序,占 6.2T 。重复行会变成相邻的,编号不一样。输出另占 6.2T
    用 awk 配合 uniq ,去重,全内存 O(1)空间算法,输出占 6.2T ,即为最终结果

    中间文件可以在不用的时候删掉,最大同时出现 2 份,也就是需要额外 2*6.2T 磁盘空间,由于都是流式算法,内存用量为很小的常数
    Keuin
        77
    Keuin  
       2024-06-02 23:49:28 +08:00
    @Keuin 最后还差一步,按行号升序排序,重新排序回原来的顺序,最大额外磁盘空间不变
    qbmiller
        78
    qbmiller  
       2024-06-03 08:11:19 +08:00
    再来一个文件 csvA. 然后这 2 文件 ,双指针遍历. A 放不重复的,
    基于顺序的话,是可以的
    blankmiss
        79
    blankmiss  
       2024-06-03 09:37:37 +08:00
    或者你给个脱敏德样本数据出来看
    cndenis
        80
    cndenis  
       2024-06-03 09:39:11 +08:00
    @wxf666 如果需要严格不能丢数据的话, 不能单用布隆过滤器.

    假设重复率比较低的的话,, 可以做两轮读取

    第一轮边读边构造布隆过滤器, 把发现的冲突的行记录到数据库

    第二轮先把数据库中值导入新的布隆过滤器, 然后用它来过滤原表, 对有冲突的行查用数据库确证没重复再输出
    Dream95
        81
    Dream95  
       2024-06-03 09:45:39 +08:00
    为啥关注点都在去重上面,就按照 MapReduce 的思路归并排序,再去重不就很好
    zhuangzhuang1988
        82
    zhuangzhuang1988  
       2024-06-03 10:08:45 +08:00
    直接 sqlite 试试

    ```python
    import sqlite3

    with sqlite3.connect("_temp.db") as conn:
    c = conn.cursor()
    c.execute(
    """
    CREATE TABLE kv (
    line TEXT UNIQUE
    );"""
    )
    with open("xxx.csv", "r", encoding="utf8") as fin:
    with open("xxx1.csv", "w", encoding="utf8") as fout:
    for line in fin:
    line = line.strip()
    if line:
    r = c.execute(
    "insert into kv(line) select :line where :line not in (select line from kv)",
    {"line": line},
    )
    if r.rowcount > 0:
    fout.write(line + "\n")
    ```
    ltux
        83
    ltux  
       2024-06-03 10:51:00 +08:00
    不能全部加载到内存,那没法用哈希表去重。
    简单地归并排序,再顺序读取去重就完了。归并排序是稳定排序,可以保持行原来的顺序,也适合用于对超出内存限制的数据排序。
    结果就是 gnu sort 就完了-_-,加 --unique 选项一步到位。

    sort --stable --unique --output=OUTPUT.csv INPUT.csv
    根据你 cpu 核心数量 N 加个 --parallel=N 选项就完
    lsk569937453
        84
    lsk569937453  
       2024-06-03 11:03:30 +08:00
    @zhuangzhuang1988 帮你试过了,一秒处理 600 行,202 亿行大概要处理 389 天。
    chutianyao
        85
    chutianyao  
       2024-06-03 11:09:42 +08:00
    203 亿行,逐行 hash, 假设 hash256, 单个值占用内存 32 字节, 203 亿行差不多试用内存 604G

    1. 逐行读取并进行 hash
    2. 使用 hash 值构建前缀树
    3. 对每一行的哈希值,有两种情况:
    1) 前缀树中已经存在, 说明哈希值重复, 该行重复了. 操作: 直接忽略本行,读取并处理下一行
    2) 前缀树中不存在, 说明行不重复. 操作: 新建文件 result.csv, 将该行追加到 result.csv 中, 再处理下一行

    关键点:
    1.所有行的哈希值占用空间 604G, 内存才 256G 无法直接存储; 使用硬盘存储后续逐行比对查找的性能太差, 所以这里使用前缀树来存储, 减少相同前缀的哈希值使用的内存空间.(具体能节省多少内存,取决于哈希值/文本行的重复比例, 极端情况 203 亿行都不重复的情况下, 前缀树估计也会把内存耗尽?)
    2.发现重复行,不直接从原文件中删除, 而是新建文件保存结果. 目的是使用追加写文件的形式、减少随机读写文件造成的性能磁盘 io 损耗
    MoYi123
        86
    MoYi123  
       2024-06-03 11:12:39 +08:00
    感觉很多人不懂“保持原先顺序是什么意思”

    是[1,3,2,3] -> [1,3,2]
    而不是[1,3,2,3] -> [1,2,3]
    lsk569937453
        87
    lsk569937453  
       2024-06-03 11:14:43 +08:00
    @chutianyao 1.系统的最大内存是 256G ,当所有的行都不相同的时候会占用 604G 啊。你不会假定有多少重复吧。。。。
    forty
        88
    forty  
       2024-06-03 11:54:53 +08:00
    学到了 1 个新知识: 布隆过滤器
    感谢大家!

    OP 的这个数据量,用哈希表也足够处理了。也可以先布隆一遍,找出一定不存在重复的,再用哈希排查不确定是否重复的。

    化整为零,先用哈希进行分类,再在分类内部进行除重(省内存,时间换空间)。

    用普通的编程语言,普通的 PC 即可,不依赖其他数据软件。

    203 亿 介于 2^34 与 2^35 (2 的 35 次方) 之间,按 2^35 算,因此 35 比特就能表示行号,可以给它 5 个字节。

    用哈希进行分类,分多少个类就写多少个文件,只记录 MD5 和行号。
    全部分类文件都写完之后,依次载入 1 个分类文件到内存,用哈希表除重,输出哈希重复(应删除的行)的行号,问题就基本解决了。

    如果分 65536 个类,则每个分类下约有 50 多万个数据,每个分类文件约 10MB 。

    如果分 256 个类,则每个分类下约有 8 千万个数据,每个分类文件约 1.6GB ,老 PC 也能干。

    如果分 16 个类,则每个分类下约有 13 亿个数据,每个分类文件约 26GB ,现在的普通 PC 都可以胜任。

    如果强迫症觉得可能有哈希冲突,那就可以再加 1 个不同的哈希算法,对这个数量级来说是基本不用考虑 MD5 冲突的。
    chutianyao
        89
    chutianyao  
       2024-06-03 12:17:39 +08:00
    @lsk569937453 所以我说了嘛,看行重复比例. 同时哈希值前缀相同, 也能节省一些内存吧.
    这个方案只是存在一定可行性,但不保证
    lttzzlll
        90
    lttzzlll  
       2024-06-03 16:37:03 +08:00
    cat input.csv | sort | uniq > out.csv

    力大出奇迹。优化阻碍发展。你先试试再说。大不了机器崩了呗。
    sampeng
        91
    sampeng  
       2024-06-03 17:49:49 +08:00
    这么点数据就要 spark ,mr 了?
    楼上很多说的没错啊。。你倒是试试 sort |uniq 之后看看结果啊。慢是肯定慢,但是试试不比你纠结强。。

    =====

    rocksdb 是一个解决方案。但如果不想上东西自己算也不是不行。自己构建结构体和硬盘文件内映射关系。hash 一定要在内存里面才能对比?在文件里面就不行么。现在都是 ssd ,随机读取没啥吧。

    我猛的一想就是

    1.hash 直接建在硬盘上。每次对比用 seed 偏移来查找。这种业务使用最好别用布隆,毕竟不是近似求结果。而是最终求结果。

    2.6T 文件。内存里只建一个够 N 条的 hash 。先读 N 条。计算 N 条里的没有重复的。保存到文件 a 。然后一直递归下去。得到 n 个小文件。然后问题就变成了 n 个小文件去重的问题。内存大,就把第一个文件读出来,去其他文件一个一个比。以此递归处理。当然,连小文件都不需要,自己规划好数据结构把 6T 文件看成 n 个小文件也是一个逻辑。这个逻辑下哪怕 1G 内存也能算出来。就看时间了。
    sampeng
        92
    sampeng  
       2024-06-03 17:55:15 +08:00
    我也是想多了。。哪有这么复杂。

    读一条。算 hash 。然后读下一条,算下一条的 hash 。相同就扔掉。。。没有相同的就写到另一个文件里面去。一个递归好像就完事了。这应该也是 sort|uniq 的逻辑。只需要内存 (每行 byte *2 )。这就是纯粹比 ssd 的速度。想加速就是利用 cpu 的并行运算搞搞分块就好了。
    lttzzlll
        93
    lttzzlll  
       2024-06-03 18:34:48 +08:00
    @lttzzlll cat input.csv | sort -u | uniq > out.csv 你试试
    james122333
        94
    james122333  
       2024-06-03 19:16:12 +08:00 via Android
    @lttzzlll

    还是看我讲的吧 每行往下查有重複就删除 毕竞 global 搜索删除还是挺好的 一行处理完换下一行 也顺带保证了顺序 内存与硬盘都可以占用少
    james122333
        95
    james122333  
       2024-06-03 19:40:37 +08:00 via Android
    @lttzzlll

    应该算是种高明的手法
    LieEar
        96
    LieEar  
       2024-06-04 09:12:15 +08:00
    6.20TB 的超大 csv ,难以置信。
    我觉得可以试试 DuckDB
    kuagura
        97
    kuagura  
       2024-06-04 09:36:25 +08:00 via Android
    换一个 10T 内存的机器 采购 你都有得赚,项目结束再卖掉
    psyer
        98
    psyer  
       2024-06-04 13:12:28 +08:00 via Android
    看到这么多讨论,给出方案,有没有人给一个切实可行的代码?实践是检验真理的唯一标准。
    wxf666
        99
    wxf666  
       2024-06-07 16:06:08 +08:00
    C++ 新人,写个去重练练手。

    - 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
    - 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
    - 预计:4 小时能去重完毕 6.20TB ?

    新人刚学会写,可能还有诸多不足之处。
    写的过程中,还有很多优化点没写。比如:

    1. 排序时,子范围太小,转为其他排序方式。
    2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
    3. 分块时,可以去除重复行,减少稍后读写数据量。

    继续改进点:

    - 转用 hash 去重,大幅减少硬盘读写数据量。
    - 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
    - 实在不行,发现重复 hash 时,再去读原文件完整比较。

    ## 截图


    ## 代码

    ```c++
    // V 站吞空格,缩进改为全角空格了

    #include <queue>
    #include <vector>
    #include <thread>
    #include <cstring>
    #include <sstream>
    #include <fstream>
    #include <iostream>
    #include <algorithm>
    #include <stdexcept>
    #include <filesystem>
    #include <string_view>
    #include <fcntl.h>
    #include <unistd.h>
    #include <sys/mman.h>
    #include <sys/stat.h>

    using std::ios;
    using std::vector, std::string_view;
    using std::to_string, std::ofstream;
    namespace fs = std::filesystem;

    int max_thread = 8;
    size_t max_memory = 1ull << 30;
    const auto tmpDir = fs::temp_directory_path();

    struct Meta {
       ptrdiff_t offset;
       size_t length;
       friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
         ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
         return ofs;
      }
    };

    struct Line {
       int chunkIdx{};
       ptrdiff_t offset{};
       string_view str{};
       auto operator> (const Line& other) {
         return std::tie(str, chunkIdx, offset)
          > std::tie(other.str, other.chunkIdx, other.offset);
      }
    };

    template <class T = char>
    class MappedFile {
       int fd = -1;
       const T* ptr{};
    public:
       const T* data{};
       size_t size{};

       explicit MappedFile(const fs::path& file) {
         struct stat64 fs{};
         fd = open64(file.c_str(), O_RDONLY);
         if (fd != -1 && fstat64(fd, &fs) != -1) {
           size = static_cast<size_t>(fs.st_size) / sizeof(T);
           data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
        }
      }

       MappedFile(const MappedFile& other) = delete;

       MappedFile(MappedFile&& old) noexcept:
         fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
         old.fd = -1;
         old.ptr = old.data = nullptr;
      }

      ~MappedFile() {
         if (data) munmap(const_cast<T*>(data), size * sizeof(T));
         if (fd != -1) close(fd);
      }

       auto end() const {
         return data + size;
      }

       operator const T*&() {
         return ptr;
      }
    };

    template <class Iter>
    void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
       if (id == 1)
         std::copy_n(src, len, dst);
       if (len > 1) {
         std::thread t;
         size_t half = len / 2;
         if (id < max_thread) // 只在左子树开启新线程
           t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
         else
           mergeSort(dst, src, half, max_thread, id * 2);
         mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
         if (t.joinable())
           t.join();
         std::merge(src, src + half, src + half, src + len, dst);
      }
    }

    // 步骤 1:分块,返回块数
    int step1_SplitChunks(const fs::path& inFile) {

      // 映射源文件
       MappedFile text {inFile};
       if (!text) throw std::runtime_error("无法打开输入文件");

      // 分块,直到源文件结束
       int chunkCount = 0;
       for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {

        // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
         vector<string_view> lines, sortedLines;
         while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
           auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
           auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
           lines.emplace_back(text, lineLen);
           text += lineLen + 1;
        }

        // 准备写入(排序后)分块、行数据。
         ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
         ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
         chunkCount++;

        // 多线程排序行数组
         sortedLines.resize(lines.size());
         mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);

        // 保存(排序后)每行文本、偏移、长度
         for (auto line: sortedLines) {
           chunkFile << line;
           metaFile << Meta{line.data() - chunkBegin, line.size()};
        }

        // 检查
         if (!chunkFile || !metaFile) {
           std::stringstream buf;
           buf << "写入第 " << chunkCount << " 分块时出错!";
           throw std::runtime_error(buf.str());
        }
      }

       return chunkCount;
    }

    // 步骤 2:查找重复行
    void step2_FindDupLines(int chunkCount) {

       vector<ofstream> chunkDups;
       vector<MappedFile<>> chunkText;
       vector<MappedFile<Meta>> chunkMeta;
       std::priority_queue<Line, vector<Line>, std::greater<>> lines;

      // 映射所有分块的文本、行数据文件,
      // 也准备好记录各分块重复行数据的文件
       for (int idx = 0; idx < chunkCount; idx++) {
         chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
         chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
         chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
         lines.push({idx});
      }

      // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
       string_view last{};
       while (!lines.empty()) {

        // 与上一行相同,则将偏移量写入,对应分块待删除行名单内
         auto line = lines.top(); lines.pop();
         if (last == line.str && !last.empty())
           chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
         last = line.str;

        // 该分块行数据未遍历完,则继续将下一行添加进小根堆中
         auto& text = chunkText[line.chunkIdx];
         auto& meta = chunkMeta[line.chunkIdx];
         if (meta < meta.end()) {
           lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
           text += (*meta).length;
           meta++;
        }
      }

      // 检查
       for (auto&& file: chunkDups) {
         if (!file) {
           std::stringstream buf;
           buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
           throw std::runtime_error(buf.str());
        }
      }
    }

    // 步骤 3:合并分块
    void step3_MergeChunks(int chunkCount, const fs::path& outFile) {

       ofstream textOut {outFile, ios::binary | ios::trunc};
       if (!textOut) throw std::runtime_error("无法打开输出文件");

       for (int idx = 0; idx < chunkCount; idx++) {

        // 映射分块(排序后)文本、行数据、删除名单
         MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
         MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
         MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};

        // 剔除删除名单中的行
         vector<Line> lines; lines.reserve(meta.size);
         for (; meta < meta.end(); text += (*meta++).length) {
           if (dups < dups.end() && *dups == (*meta).offset)
             dups++;
           else
             lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
        }

        // 再按偏移量顺序排序好
         std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
           return a.offset < b.offset;
        });

        // 逐行输出
         for (auto&& line: lines)
           textOut << line.str << '\n';
      }

      // 检查
       if (!textOut)
         throw std::runtime_error("写入输出文件时出错!");
    }

    int main(int argc, const char* argv[]) {

       if (argc < 3) {
         std::stringstream buf;
         buf << "大文本去重并保持顺序工具\n\n"
          << "用法:" << argv[0] << " 输入文件 输出文件 "
          << "[内存限制 MB = " << (max_memory >> 20) << "] "
          << "[线程限制 = " << max_thread << "]";
         std::cerr << buf.str() << std::endl;
         return -1;
      }

       auto inFile = argv[1];
       auto outFile = argv[2];
       if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
       if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);

       auto chunkCount = step1_SplitChunks(inFile);
       step2_FindDupLines(chunkCount);
       step3_MergeChunks(chunkCount, outFile);

      // 清空临时文件
       for (int i = 0; i < chunkCount; i++)
         for (auto&& suffix: {".txt", ".meta", ".dups"})
           fs::remove(tmpDir / (to_string(i) + suffix));
    }
    ```
    Keuin
        100
    Keuin  
       2024-06-08 02:02:41 +08:00
    ```shell
    awk '{print $0","NR}' input.csv | sort | sed -E 's/,[0-9]+$//' | uniq
    ```

    Example usage:

    ```
    $ cat input
    1,2,3,4
    2,3,4,5
    3,4,5,6
    4,5,6,7
    2,3,4,5
    1,2,3,4
    5,6,7,8
    $ awk '{print $0","NR}' input
    1,2,3,4,1
    2,3,4,5,2
    3,4,5,6,3
    4,5,6,7,4
    2,3,4,5,5
    1,2,3,4,6
    5,6,7,8,7
    $ awk '{print $0","NR}' input | sort
    1,2,3,4,1
    1,2,3,4,6
    2,3,4,5,2
    2,3,4,5,5
    3,4,5,6,3
    4,5,6,7,4
    5,6,7,8,7
    $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//'
    1,2,3,4
    1,2,3,4
    2,3,4,5
    2,3,4,5
    3,4,5,6
    4,5,6,7
    5,6,7,8
    $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//' | uniq
    1,2,3,4
    2,3,4,5
    3,4,5,6
    4,5,6,7
    5,6,7,8
    ```

    不管你的电脑内存是 1T 还是 1G ,都可以正确运行并得到相同输出,因为 sort 命令用的是归并排序,是外存算法。如果你要限制用到的内存大小,把 sort 改成 sort --buffer-size=100M ,即可限制只用 100M 内存,其他命令都是行缓存算法,只会保存当前行在内存里,也就是说,最大内存用量是 max(100M, max_line_size_bytes)
    1  2  
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2923 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 13:31 · PVG 21:31 · LAX 06:31 · JFK 09:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.