AndroZoo数据集下载工具

这笔记没有apkkey,可以发布

github:GitHub - czc6666/AndroZoo_Download: AndroZoo数据集下载程序 筛选超大csv文件 制作下载链接 多线程下载 apk文件 安卓恶意代码数据集

最终代码:

import os
import requests
from datetime import datetime
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import time
from tqdm import tqdm
import gc
import random

debug = True


# 读取 CSV 文件并按块处理
def czc_read_csv(path, chunksize=100000, parse_dates=['dex_date']):
    debug_print(f'开始读取 CSV 文件:{path}')
    starttime = time.perf_counter()
    chunks = []
    try:
        # 使用 pandas 的 chunk 机制逐块读取大文件
        for chunk in tqdm(pd.read_csv(path, parse_dates=parse_dates, chunksize=chunksize)):
            chunks.append(chunk)
        # 合并所有块到一个 DataFrame
        df = pd.concat(chunks, ignore_index=True)
        endtime = time.perf_counter()
        debug_print(f'CSV 文件读取完毕,用时:{endtime - starttime:.2f}秒')
        return df
    except Exception as e:
        debug_print(f'读取 CSV 文件失败:{e}')
        return None


# 筛选符合条件的 APK 并生成包含 SHA256 值的 TXT 文件
def czc_filter_apk(config, output_dir, csv_path='latest.csv'):
    debug_print('开始筛选 APK')
    start_year_filter = config['start_year']
    end_year_filter = config['end_year']
    dex_size_limit = config['dex_size_limit']
    apk_size_limit = config['apk_size_limit']

    debug_print('文件读取中')
    df = czc_read_csv(csv_path, parse_dates=['dex_date'])
    if df is None:
        return None

    debug_print('筛选 APK 中')
    filtered_df = df.loc[(df['dex_date'].dt.year >= start_year_filter) & 
                        (df['dex_date'].dt.year <= end_year_filter) &
                        (df['vt_detection'] == 0) & 
                        (df['dex_size'] < dex_size_limit) &
                        (df['apk_size'] < apk_size_limit)]
    sha256_list = filtered_df['sha256'].tolist()

    filtered_conditions = f"start_year_{start_year_filter}_end_year_{end_year_filter}_dex_size_{dex_size_limit}_apk_size_{apk_size_limit}"
    filtered_file = os.path.join(output_dir, f'筛选后apk_{filtered_conditions}.txt')
    debug_print('apk筛选完成')

    debug_print('保存 SHA256 到文件')
    try:
        with open(filtered_file, 'w') as f:
            for sha in sha256_list:
                f.write(sha + '\n')
        debug_print('apk筛选导出到txt完成')
    except Exception as e:
        debug_print(f'保存 SHA256 到文件失败:{e}')
        return None

    del df  # 删除 df 释放内存
    gc.collect()  # 强制进行垃圾回收
    debug_print('垃圾回收完成')

    return filtered_file


# 下载 APK 并记录已下载的文件
def czc_download_apk(apikey, filtered_file, output_dir, target_count=10000):
    debug_print('开始下载 APK')
    filtered_conditions = os.path.splitext(os.path.basename(filtered_file))[0]
    downloaded_file = os.path.join(output_dir, f'已下载apk_{filtered_conditions}.txt')

    if not os.path.exists(downloaded_file):
        open(downloaded_file, 'w').close()

    with open(filtered_file, 'r') as f:
        sha256_list = f.readlines()

    with open(downloaded_file, 'r') as f:
        downloaded_list = f.readlines()

    sha256_list = [sha.strip() for sha in sha256_list]
    downloaded_list = [sha.strip() for sha in downloaded_list]

    to_download = list(set(sha256_list) - set(downloaded_list))
    random.shuffle(to_download)

    download_dir = os.path.join(output_dir, 'apks')
    os.makedirs(download_dir, exist_ok=True)
    debug_print(f'下载目录已创建:{download_dir}')

    with tqdm(total=target_count, desc='下载进度') as pbar:
        while len(downloaded_list) < target_count and to_download:
            sha256 = to_download.pop()
            url = f"https://androzoo.uni.lu/api/download?apikey={apikey}&sha256={sha256}"
            debug_print(f'尝试下载 APK:{sha256}')
            try:
                response = requests.get(url, verify=True, timeout=10)
                if response.status_code == 200:
                    apk_name = sha256 + '.apk'
                    with open(os.path.join(download_dir, apk_name), 'wb') as file:
                        file.write(response.content)
                    downloaded_list.append(sha256)
                    with open(downloaded_file, 'a') as f:
                        f.write(sha256 + '\n')
                    pbar.update(1)
                    debug_print(f'下载成功:{sha256}')
                else:
                    debug_print(f'下载失败:{sha256}, 状态码:{response.status_code}')
            except Exception as e:
                debug_print(f'下载错误:{sha256},错误信息:{e}')


# 多线程下载 APK 文件
def czc_download_apk_multithreaded(apikey, filtered_file, output_dir, target_count=10000, num_threads=200):
    debug_print('开始多线程下载 APK')
    filtered_conditions = os.path.splitext(os.path.basename(filtered_file))[0]
    downloaded_file = os.path.join(output_dir, f'已下载apk_{filtered_conditions}.txt')

    if not os.path.exists(downloaded_file):
        open(downloaded_file, 'w').close()

    with open(filtered_file, 'r') as f:
        sha256_list = f.readlines()

    with open(downloaded_file, 'r') as f:
        downloaded_list = f.readlines()

    sha256_list = [sha.strip() for sha in sha256_list]
    downloaded_list = [sha.strip() for sha in downloaded_list]

    to_download = list(set(sha256_list) - set(downloaded_list))
    random.shuffle(to_download)

    download_dir = os.path.join(output_dir, 'apks')
    os.makedirs(download_dir, exist_ok=True)
    debug_print(f'下载目录已创建:{download_dir}')

    def download_task(sha256, pbar):
        url = f"https://androzoo.uni.lu/api/download?apikey={apikey}&sha256={sha256}"
        debug_print(f'尝试下载 APK:{sha256}')
        try:
            response = requests.get(url, verify=True, timeout=10)
            if response.status_code == 200:
                apk_name = sha256 + '.apk'
                with open(os.path.join(download_dir, apk_name), 'wb') as file:
                    file.write(response.content)
                with open(downloaded_file, 'a') as f:
                    f.write(sha256 + '\n')
                pbar.update(1)
                debug_print(f'下载成功:{sha256}')
            else:
                debug_print(f'下载失败:{sha256}, 状态码:{response.status_code}')
        except Exception as e:
            debug_print(f'下载错误:{sha256},错误信息:{e}')

    with tqdm(total=target_count, desc='下载进度') as pbar:
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = []
            while len(downloaded_list) < target_count and to_download:
                sha256 = to_download.pop()
                future = executor.submit(download_task, sha256, pbar)
                futures.append(future)
                downloaded_list.append(sha256)
            for future in futures:
                future.result()
    debug_print('多线程下载完成')


# 调试打印
def debug_print(message):
    if debug:
        print('🤪' , message)


# 生成下载目录
def 生成下载目录(download_dir):
    output_subdir = datetime.now().strftime("%Y%m%d") + '_' + '_'.join([str(configs[c]) for c in configs])
    output_dir = os.path.join(download_dir, output_subdir)
    debug_print(f'生成下载目录名字:{output_dir}')

    os.makedirs(output_dir, exist_ok=True)
    debug_print('创建下载目录完成')

    return output_dir


if __name__ == '__main__':
    apikey = '这写key'
    csv_path = 'latest.csv'
    download_dir= ''
    num_threads = 200
    下载apk数 = 20000
    configs = {
        'start_year': 2017,
        'end_year': 2018,
        'dex_size_limit': 512 * 1024,
        'apk_size_limit': 512 * 1024 * 1024
    }

    output_dir = 生成下载目录(download_dir)  # 指定下载路径,例如 'D:/downloads',如果留空则为当前目录

    filtered_file = czc_filter_apk(configs, output_dir, csv_path)
    if filtered_file:
        czc_download_apk_multithreaded(apikey, filtered_file, output_dir, 下载apk数, num_threads)
    else:
        debug_print('筛选 APK 失败,无法进行下载')