diff --git a/config/ds_block_base.sh b/config/ds_block_base.sh index 93ea290..441779e 100644 --- a/config/ds_block_base.sh +++ b/config/ds_block_base.sh @@ -18,7 +18,7 @@ gpt_options=" \ --train-iters 150000 \ --resume-dataloader \ --train-data bert-base \ - --lazy-loader \ + --no-lazy-loader \ --tokenizer-type BertWordPieceTokenizer \ --tokenizer-model-type bert-base-uncased \ --split 949,50,1 \ diff --git a/config/ds_block_tiny.sh b/config/ds_block_tiny.sh new file mode 100644 index 0000000..baf1dec --- /dev/null +++ b/config/ds_block_tiny.sh @@ -0,0 +1,37 @@ +#! /bin/bash + +script_path=$(realpath $BASH_SOURCE) +script_dir=$(dirname $script_path) + +config_json="$script_dir/config_block_base.json" +gpt_options=" \ + --block-lm \ + --bert-prob 1.0 \ + --experiment-name blocklm-blank \ + --model-parallel-size ${MP_SIZE} \ + --num-layers 4 \ + --hidden-size 768 \ + --num-attention-heads 12 \ + --seq-length 512 \ + --max-position-embeddings 512 \ + --save /root/data/checkpoints \ + --train-iters 150000 \ + --resume-dataloader \ + --train-data test \ + --no-lazy-loader \ + --tokenizer-type BertWordPieceTokenizer \ + --tokenizer-model-type bert-base-uncased \ + --split 949,50,1 \ + --distributed-backend nccl \ + --lr-decay-style cosine \ + --lr-decay-iters 120000 \ + --lr-decay-ratio 0.05 \ + --warmup .05 \ + --checkpoint-activations \ + --deepspeed-activation-checkpointing \ + --fp16 \ +" +gpt_options="${gpt_options} + --deepspeed \ + --deepspeed_config ${config_json} \ +" diff --git a/data_utils/corpora.py b/data_utils/corpora.py index 4374946..defd6b7 100755 --- a/data_utils/corpora.py +++ b/data_utils/corpora.py @@ -97,7 +97,16 @@ class DataReader: TASK_QUEUE_LIMIT = 10000000 DONE_QUEUE_LIMIT = 10000000 - def tokenize_worker(self, input, output, info, tokenizer, tokenize): + def tokenize_worker(self, q_in: Queue, q_out: Queue, q_info: Queue, tokenizer, tokenize: bool): + """ + 此函数为多进程的主入口 + Args: + q_in: 输入队列,一个 item 为一行数据 + q_out: 结果队列,item 同上 + q_info: 日志队列,一个 item 为一条日志 + tokenizer: 来自 configure_data.prepare_tokenizer 的返回值,class 为 data_utils.tokenization.Tokenizer + tokenize: 来自 args 的 --no-pre-tokenize + """ raise NotImplementedError def print_info(self, info): @@ -111,6 +120,9 @@ def __init__(self, writers, tokenizer=None, tokenize=False, **kwargs): self.writers = writers def process(self): + """ + 采用生产者、消费者的逻辑,将数据文件按照条进行拆分,详细的逻辑在下面 + """ if os.path.isdir(self.PATH): paths = [os.path.join(top, name) for top, _, names in os.walk(self.PATH) for name in names] # paths = [entry.path for entry in os.scandir(self.PATH) if @@ -120,6 +132,7 @@ def process(self): task_queue, done_queue, info_queue = Queue(maxsize=self.TASK_QUEUE_LIMIT), Queue( maxsize=self.DONE_QUEUE_LIMIT), Queue() processes = [] + # 定义消费者 for i in range(NUM_PROCESSES): process = Process(target=self.tokenize_worker, args=(task_queue, done_queue, info_queue, self.tokenizer, self.tokenize)) @@ -141,11 +154,13 @@ def read_input_to_queue(): for i in range(len(processes)): task_queue.put('STOP') + # 定义生产者 process = Process(target=read_input_to_queue) process.start() count = len(processes) progress_bar = tqdm.tqdm() while True: + # 将消费者处理好的数据写至 writer data = done_queue.get() if data == 'COMPLETE': count -= 1 @@ -187,16 +202,20 @@ def process_line(self, data, tokenizer, tokenize): class PromptReader(DataReader): is_json = True - def tokenize_worker(self, input, output, info, tokenizer, tokenize): - for row in iter(input.get, 'STOP'): + def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize): + """ + 对于 PromptReader 这个类来说,每行为一个 JSON String + """ + for row in iter(q_in.get, 'STOP'): if row: if self.is_json: row = row.rstrip() row = json.loads(row) + # 处理每行数据,需要子类实现 prompts, texts = self.process_line(row, tokenizer, tokenize) for prompt, text in zip(prompts, texts): - output.put((prompt, text)) - output.put("COMPLETE") + q_out.put((prompt, text)) + q_out.put("COMPLETE") @staticmethod def write_result(data, writers): @@ -230,12 +249,12 @@ def process_line(self, data, tokenizer, tokenize): text_mask.append(len(content)) return (summary, summary_mask), (text, text_mask) - def tokenize_worker(self, input, output, info, tokenizer, tokenize): - for row in iter(input.get, 'STOP'): + def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize): + for row in iter(q_in.get, 'STOP'): data = json.loads(row) summary, content = self.process_line(data, tokenizer, tokenize) - output.put((summary, content)) - output.put("COMPLETE") + q_out.put((summary, content)) + q_out.put("COMPLETE") @staticmethod def write_result(data, writers): @@ -349,6 +368,11 @@ def process_line(self, data, tokenizer, tokenize): class TestDataset(PromptReader): + """ + 当 Dataset 有 2 条数据时,/root/data/test.json 的内容应该形似 + {"prompt":"", "text":""} + {"prompt":"", "text":""} + """ PATH = '/root/data/test.json' assert_str = "make sure to set PATH for wikipedia data_utils/corpora.py" @@ -433,9 +457,9 @@ def print_info(self, info): break print_rank_0(total_dict) - def tokenize_worker(self, input, output, info, tokenizer, tokenize): + def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize): source_dict = defaultdict(int) - for row in iter(input.get, 'STOP'): + for row in iter(q_in.get, 'STOP'): row = row.rstrip() if row: if self.is_json: @@ -444,11 +468,11 @@ def tokenize_worker(self, input, output, info, tokenizer, tokenize): length = 0 for prompt, text in zip(prompts, texts): length += len(text) - output.put((prompt, text)) + q_out.put((prompt, text)) if source: source_dict[source] += length - output.put("COMPLETE") - info.put(source_dict) + q_out.put("COMPLETE") + q_info.put(source_dict) def process_line(self, data, tokenizer, tokenize): source = data["meta"].get("pile_set_name", None) @@ -511,6 +535,7 @@ def process_line(self, item, tokenizer, tokenize): return prompts, texts +# 数据集的字典 NAMED_CORPORA = { 'wikipedia': wikipedia, 'wikipedia-key': KeyReader, diff --git a/scripts/ds_pretrain_nvidia.sh b/scripts/ds_pretrain_nvidia.sh index eac912f..29820e1 100644 --- a/scripts/ds_pretrain_nvidia.sh +++ b/scripts/ds_pretrain_nvidia.sh @@ -13,7 +13,7 @@ DATESTR=$(date +"%m-%d-%H-%M") OPTIONS_NCCL="NCCL_DEBUG=info NCCL_IB_DISABLE=0 NCCL_NET_GDR_LEVEL=2" HOST_FILE_PATH="/workspace/hostfile" -mkdir logs +mkdir -p logs run_cmd="${OPTIONS_NCCL} deepspeed --master_port ${MASTER_PORT} --num_nodes ${NUM_WORKERS} --num_gpus ${NUM_GPUS_PER_WORKER} --hostfile ${HOST_FILE_PATH} pretrain_glm.py ${gpt_options} 2>&1 | tee logs/log-${DATESTR}.txt" echo ${run_cmd} eval ${run_cmd}