Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add comments and add ds_block_tiny for testing #167

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/ds_block_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ gpt_options=" \
--train-iters 150000 \
--resume-dataloader \
--train-data bert-base \
--lazy-loader \
--no-lazy-loader \
--tokenizer-type BertWordPieceTokenizer \
--tokenizer-model-type bert-base-uncased \
--split 949,50,1 \
Expand Down
37 changes: 37 additions & 0 deletions config/ds_block_tiny.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#! /bin/bash

script_path=$(realpath $BASH_SOURCE)
script_dir=$(dirname $script_path)

config_json="$script_dir/config_block_base.json"
gpt_options=" \
--block-lm \
--bert-prob 1.0 \
--experiment-name blocklm-blank \
--model-parallel-size ${MP_SIZE} \
--num-layers 4 \
--hidden-size 768 \
--num-attention-heads 12 \
--seq-length 512 \
--max-position-embeddings 512 \
--save /root/data/checkpoints \
--train-iters 150000 \
--resume-dataloader \
--train-data test \
--no-lazy-loader \
--tokenizer-type BertWordPieceTokenizer \
--tokenizer-model-type bert-base-uncased \
--split 949,50,1 \
--distributed-backend nccl \
--lr-decay-style cosine \
--lr-decay-iters 120000 \
--lr-decay-ratio 0.05 \
--warmup .05 \
--checkpoint-activations \
--deepspeed-activation-checkpointing \
--fp16 \
"
gpt_options="${gpt_options}
--deepspeed \
--deepspeed_config ${config_json} \
"
53 changes: 39 additions & 14 deletions data_utils/corpora.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ class DataReader:
TASK_QUEUE_LIMIT = 10000000
DONE_QUEUE_LIMIT = 10000000

def tokenize_worker(self, input, output, info, tokenizer, tokenize):
def tokenize_worker(self, q_in: Queue, q_out: Queue, q_info: Queue, tokenizer, tokenize: bool):
"""
此函数为多进程的主入口
Args:
q_in: 输入队列,一个 item 为一行数据
q_out: 结果队列,item 同上
q_info: 日志队列,一个 item 为一条日志
tokenizer: 来自 configure_data.prepare_tokenizer 的返回值,class 为 data_utils.tokenization.Tokenizer
tokenize: 来自 args 的 --no-pre-tokenize
"""
raise NotImplementedError

def print_info(self, info):
Expand All @@ -111,6 +120,9 @@ def __init__(self, writers, tokenizer=None, tokenize=False, **kwargs):
self.writers = writers

def process(self):
"""
采用生产者、消费者的逻辑,将数据文件按照条进行拆分,详细的逻辑在下面
"""
if os.path.isdir(self.PATH):
paths = [os.path.join(top, name) for top, _, names in os.walk(self.PATH) for name in names]
# paths = [entry.path for entry in os.scandir(self.PATH) if
Expand All @@ -120,6 +132,7 @@ def process(self):
task_queue, done_queue, info_queue = Queue(maxsize=self.TASK_QUEUE_LIMIT), Queue(
maxsize=self.DONE_QUEUE_LIMIT), Queue()
processes = []
# 定义消费者
for i in range(NUM_PROCESSES):
process = Process(target=self.tokenize_worker,
args=(task_queue, done_queue, info_queue, self.tokenizer, self.tokenize))
Expand All @@ -141,11 +154,13 @@ def read_input_to_queue():
for i in range(len(processes)):
task_queue.put('STOP')

# 定义生产者
process = Process(target=read_input_to_queue)
process.start()
count = len(processes)
progress_bar = tqdm.tqdm()
while True:
# 将消费者处理好的数据写至 writer
data = done_queue.get()
if data == 'COMPLETE':
count -= 1
Expand Down Expand Up @@ -187,16 +202,20 @@ def process_line(self, data, tokenizer, tokenize):
class PromptReader(DataReader):
is_json = True

def tokenize_worker(self, input, output, info, tokenizer, tokenize):
for row in iter(input.get, 'STOP'):
def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize):
"""
对于 PromptReader 这个类来说,每行为一个 JSON String
"""
for row in iter(q_in.get, 'STOP'):
if row:
if self.is_json:
row = row.rstrip()
row = json.loads(row)
# 处理每行数据,需要子类实现
prompts, texts = self.process_line(row, tokenizer, tokenize)
for prompt, text in zip(prompts, texts):
output.put((prompt, text))
output.put("COMPLETE")
q_out.put((prompt, text))
q_out.put("COMPLETE")

@staticmethod
def write_result(data, writers):
Expand Down Expand Up @@ -230,12 +249,12 @@ def process_line(self, data, tokenizer, tokenize):
text_mask.append(len(content))
return (summary, summary_mask), (text, text_mask)

def tokenize_worker(self, input, output, info, tokenizer, tokenize):
for row in iter(input.get, 'STOP'):
def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize):
for row in iter(q_in.get, 'STOP'):
data = json.loads(row)
summary, content = self.process_line(data, tokenizer, tokenize)
output.put((summary, content))
output.put("COMPLETE")
q_out.put((summary, content))
q_out.put("COMPLETE")

@staticmethod
def write_result(data, writers):
Expand Down Expand Up @@ -349,6 +368,11 @@ def process_line(self, data, tokenizer, tokenize):


class TestDataset(PromptReader):
"""
当 Dataset 有 2 条数据时,/root/data/test.json 的内容应该形似
{"prompt":"", "text":""}
{"prompt":"", "text":""}
"""
PATH = '/root/data/test.json'
assert_str = "make sure to set PATH for wikipedia data_utils/corpora.py"

Expand Down Expand Up @@ -433,9 +457,9 @@ def print_info(self, info):
break
print_rank_0(total_dict)

def tokenize_worker(self, input, output, info, tokenizer, tokenize):
def tokenize_worker(self, q_in, q_out, q_info, tokenizer, tokenize):
source_dict = defaultdict(int)
for row in iter(input.get, 'STOP'):
for row in iter(q_in.get, 'STOP'):
row = row.rstrip()
if row:
if self.is_json:
Expand All @@ -444,11 +468,11 @@ def tokenize_worker(self, input, output, info, tokenizer, tokenize):
length = 0
for prompt, text in zip(prompts, texts):
length += len(text)
output.put((prompt, text))
q_out.put((prompt, text))
if source:
source_dict[source] += length
output.put("COMPLETE")
info.put(source_dict)
q_out.put("COMPLETE")
q_info.put(source_dict)

def process_line(self, data, tokenizer, tokenize):
source = data["meta"].get("pile_set_name", None)
Expand Down Expand Up @@ -511,6 +535,7 @@ def process_line(self, item, tokenizer, tokenize):
return prompts, texts


# 数据集的字典
NAMED_CORPORA = {
'wikipedia': wikipedia,
'wikipedia-key': KeyReader,
Expand Down
2 changes: 1 addition & 1 deletion scripts/ds_pretrain_nvidia.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ DATESTR=$(date +"%m-%d-%H-%M")
OPTIONS_NCCL="NCCL_DEBUG=info NCCL_IB_DISABLE=0 NCCL_NET_GDR_LEVEL=2"
HOST_FILE_PATH="/workspace/hostfile"

mkdir logs
mkdir -p logs
run_cmd="${OPTIONS_NCCL} deepspeed --master_port ${MASTER_PORT} --num_nodes ${NUM_WORKERS} --num_gpus ${NUM_GPUS_PER_WORKER} --hostfile ${HOST_FILE_PATH} pretrain_glm.py ${gpt_options} 2>&1 | tee logs/log-${DATESTR}.txt"
echo ${run_cmd}
eval ${run_cmd}
Expand Down