for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题
問題描述: 筆者在文本分類場景中使用了roberta+pool+dense的三分類模型。采用預訓練模型做項目的時候經(jīng)常苦于數(shù)據(jù)太少,模型泛化性差,因此收集了1300W數(shù)據(jù)。在我嘗試暴力出奇跡的時候,遇到了部分問題,在此記錄一下。
一. 數(shù)據(jù)預處理時間長
盡管數(shù)據(jù)預處理步驟比較簡單,一般也就清洗、分詞、token2id等操作(我一般把token2id放在預處理階段做),但是由于數(shù)據(jù)量比較大時,也很耗時間。
1.分詞提速
jieba分詞開啟并行很簡單,一行代碼開啟
jieba.enable_parallel(100)但是這里注意,jieba并行的過程是一個 jieba.cut()這一動作中的,如果輸入的是多個句子,則jieba會并行處理多個句子。
# 錯誤jieba.cut('我愛北京天安門。')jieba.cut('北京是中國首都。')#正確示范jieba.cut('我愛北京天安門。\n北京是中國首都。')因此如果需要采用并行,需要先包裝多行數(shù)據(jù),再拆分出來。具體參考
temp_lines = texts[:100]_res = ' '.join((jieba.cut('\n'.join(temp_lines))))split_words = [_.lstrip(' ').rstrip(' ') for _ in _res.split('\n')]res.extend(split_words)2. 函數(shù)并行
當然,使用預訓練模型不需要對中文進行分詞,因此此處耗時主要在數(shù)據(jù)清洗,正則處理等操作上,所以方法1已經(jīng)不起作用啦。過程就是先讀取文件到內(nèi)存,然后分多個進程預處理,最終獲取所有進程處理結(jié)果,寫入文件。
import multiprocessingimport mathdef func(id, texts): print('enter %d' % (id)) res = [] for i, text in enumerate(texts): if i % 10000 == 0 and i != 0: print(i, '/', id) value=tokenizer.encode(first=text, max_len=seq_len)[0] res.append(value) print('leave %d' % (id)) return id, res def quick_func(texts, func): pool = multiprocessing.Pool(processes=CPUS) results = [] for i in range(CPUS): imin = i*math.ceil(len(texts)/CPUS) imax = min((i+1)*math.ceil(len(texts)/CPUS), len(texts)) results.append(pool.apply_async(func, (i, texts[imin:imax],))) pool.close() pool.join() print("Sub-process(es) done.") res = [] for _ in results: res.append(_.get()) res = sorted(res, key=lambda x: x[0]) texts = [] for _ in res: texts.extend(_[1]) return texts注意! func內(nèi)部出錯,子進程直接結(jié)束,不raise錯誤的。如果要調(diào)試,最好還是加上traceback查看問題
在func中完全可以使用jieba的cut,不需要開啟jieba并行。在處理數(shù)據(jù)500萬左右的時候,各個進程已經(jīng)結(jié)束,但是該函數(shù)遲遲不返回值,直到我htop查看內(nèi)存,發(fā)現(xiàn)內(nèi)存快速增長到150G之后不在增長,程序也卡住啦,發(fā)現(xiàn)可能是 raw句子和處理后的結(jié)果占用內(nèi)存過高,因此這個方法也不行了。
3. 文件并行
查看資料的時候發(fā)現(xiàn),有老哥處理幾億行文件使用linux bash來預處理,速度明顯提升,我也想嘗試,但是最終由于功力不足,失敗。折中辦法,先將文件拆分成多個文件,python并行處理多個文件并輸出結(jié)果到多個文件,然后合并多個文件到單個文件。(注意,這里會丟失各行順序,如果去重需要在外部處理,僅僅是讀取寫入文件單線程也還是挺快的)
并行讀取同一個文件或者并行寫入同一個文件是危險的,可能會寫入或者讀取混亂(錯誤)
import multiprocessingCPUS = 40IN_DATA_SPLIT_PREFIX = 'data-split-tmp-in'OUT_DATA_SPLIT_PREFIX = 'data-split-tmp-out'seq_len = 256 # 文本最大長度vocab_path = '../../basedata/chinese_wwm_ext_L-12_H-768_A-12/vocab.txt'tokenizer = LimitBertTokenizer(vocab_path) # text2 *4 < text1char_tool = CharTools()def _clean_tokenize(infile, outfile, filters=[]): fin = open(infile, 'r') fout = open(outfile, 'w') for i, line in enumerate(fin): if i % 10000 == 0 and i != 0: print(i, ' / ', infile) items = line.strip().split('\t') ###########------------######### if len(items) != 6: continue object_id, _, operator_id, title, content, action = items type = 'model' if operator_id == '0' else 'human' if type in filters: continue if action not in action2label.keys(): continue label = action2label[action] title = char_tool.clean(title) content = char_tool.clean(content) title = title[:seq_len] content = content[:seq_len] wordids, segmentids = tokenizer.encode( first=content, second=title, max_len=seq_len) fout.write(json.dumps( {'type': type, 'label': label, 'wordids': wordids, 'segmentids': segmentids})+'\n') ###########------------######### fin.close() fout.close()def parallel(_func, infile, outfile, filters=[]): os.system('split -n l/%d %s %s' % (CPUS, infile, IN_DATA_SPLIT_PREFIX)) print("split files done") pool = multiprocessing.Pool(processes=CPUS) for small_data_file_in in [_ for _ in os.listdir('.') if _.startswith(IN_DATA_SPLIT_PREFIX)]: small_data_file_out = small_data_file_in.replace( IN_DATA_SPLIT_PREFIX, OUT_DATA_SPLIT_PREFIX) pool.apply_async(_func, args=( small_data_file_in, small_data_file_out, filters,)) pool.close() pool.join() print("Sub-process(es) done.") os.system('cat %s* > %s' % (OUT_DATA_SPLIT_PREFIX, outfile)) os.system('rm %s*' % (IN_DATA_SPLIT_PREFIX)) os.system('rm %s*' % (OUT_DATA_SPLIT_PREFIX)) print("done.")二. numpy加載后占用內(nèi)存太大
之前由于機器內(nèi)存夠用+數(shù)據(jù)量不算太大,在訓練過程中我都是加載前文處理的json文件為numpy數(shù)據(jù)然后使用model.fit()進行訓練的,代碼如下
def load_from_json(filename): labels = [] wordids = [] segmentids = [] with open(filename, 'r') as f: for i, line in enumerate(f): if i % 100000 == 0 and i != 0: print('載入數(shù)據(jù):%d ' % i) item = json.loads(line.strip()) labels.append(item['label']) wordids.append(item['wordids']) wordids = np.array(wordids) segmentids = np.zeros((len(labels), seq_len), int) labels = tf.keras.utils.to_categorical(labels) [train_wordids, val_wordids, train_segmentids, val_segmentids, train_label3s, val_label3s] = train_test_split(wordids, segmentids, label3s, test_size=0.01, stratify=labels,random_state=0) return [[train_wordids, train_segmentids], [train_label3s], [val_wordids, val_segmentids], [val_label3s]]train_X,train_y,val_X,val_y=load_from_json()model.fit(train_X, train_Y, validation_data=(val_X, val_Y),)直到,我遇到了千萬數(shù)據(jù)集,首先讀取后占用機器超過150G內(nèi)存,另外python提示單個變量占用超過10%內(nèi)存,就此程序卡住,因此不得不更換方法。
- tf.data: 官方推薦的方法,但是我感覺使用json或者re都不是很方便,加上tf.function寫起來不是很方便,放棄。
- data.generator:一般generator很常見,但是很多人使用的時候都是把數(shù)據(jù)完全讀進內(nèi)存,然后在generator中實現(xiàn)shuffle和輸出batch的功能(就沒有g(shù)enerator的作用啦),這里由于數(shù)據(jù)量太大,明顯是不能讀取所有數(shù)據(jù)進內(nèi)存的。為了保持shuffle的功能,這里還是順序讀取文件,但是維持一個buffer, 在buffer中對數(shù)據(jù)進行shuffle。
具體實現(xiàn)了一個DataGenerator父類,其必須包含數(shù)據(jù)條目(可返回batch個數(shù)),因為model.fit中需要指定其迭代次數(shù)。為了保證generaotr持續(xù)有輸出,在讀取文件到末尾的時候,自動返回文件頭。另外由于是在buffer中shuffle,其不能保證文件中的各行只輸出一次(但是能保證一個epoch最多max_buffer_size個重復的),需要依據(jù)數(shù)據(jù)條目酌情設置,這里應該優(yōu)化,在達到文件末尾后等全量buffer清空后在seed到文件頭。另外,實現(xiàn)了子類,具體實現(xiàn)lines到numpy的操作。其實也可以把數(shù)據(jù)預處理和token2id放在這里,但是每個epoch都要處理一次,有點浪費時間,因此習慣把所有預處理和toekn2id放到train前的預處理腳本中。
三. 模型訓練速度慢,需要多卡訓練
在tf2之后并行更簡單了,代碼如下:
import tensorflow as tffrom keras_bert import load_trained_model_from_checkpointdef create_model(bert_train=False): bert = load_trained_model_from_checkpoint( config_path, checkpoint_path, training=False, trainable=bert_train, seq_len=SEQ_LEN,) inputs = bert.inputs[:2] dense = bert.get_layer('Encoder-12-FeedForward-Norm').output dense = tf.keras.layers.Lambda(lambda x: x[:, 1:, :])(dense) dense1 = tf.keras.layers.GlobalMaxPool1D()(dense) dense2 = tf.keras.layers.GlobalAveragePooling1D()(dense) dense = tf.keras.layers.Concatenate()([dense1, dense2]) dense = tf.keras.layers.Dense(params.dnn_units, activation='relu')(dense) dense = tf.keras.layers.Dropout(rate=params.dropout)(dense) output = tf.keras.layers.Dense( units=3, activation='softmax', name='3cls')(dense) model = tf.keras.models.Model(inputs, output) return modelos.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"gpus = tf.config.experimental.list_physical_devices(device_type='GPU')for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)gpus = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))strategy = tf.distribute.MirroredStrategy()with strategy.scope(): model = create_model(bert_train=False) scheduler = tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=int(params.fit_opt_patience), min_delta=1e-7) loss = LossGenerate(params.model_loss) metrics = ['accuracy'] optimizer = tf.keras.optimizers.Adam(params.fit_lr) csvlogger = tf.keras.callbacks.CSVLogger(os.path.join( params.model_dir, 'log.tsv'), append=True, separator='\t') earlystop = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=params.fit_patience) checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(params.model_dir, 'stage1.weight.h5'), save_weights_only=True, save_best_only=True) model.compile(loss=loss, metrics=metrics, optimizer=optimizer)只需要在strategy.scope()下定義模型就行,很簡單。但是我也遇到一個問題: 在預測時,在strategy.scope()加載存儲的模型文件報錯:
from keras_bert import get_custom_objectscustom_objects = get_custom_objects()with strategy.scope(): model = tf.keras.models.load_model( model_path, custom_objects=custom_objects)# 報錯具體錯誤google很久也沒有結(jié)果,最終發(fā)現(xiàn)在strategy.scope下載入權(quán)重文件是可以的(可能是哪里實現(xiàn)兼容性不強吧),代碼:
with strategy.scope(): model = create_model(bert_train=False) model.load_weights(os.path.join(params.model_dir, 'stage1.weight.h5'))實驗結(jié)果
最終,在6卡v100并行下, 1000萬長度384的分類模型訓練好啦。stage1為固定bert訓練結(jié)果, 01-0.4238為所有參數(shù)train的結(jié)果。發(fā)現(xiàn)了:1000W數(shù)據(jù),max-len設置為384, RoBERTa-wwm-ext 模型訓練需要接近25小時。其實還是蠻快的.... 另外:?大力出奇跡的模型效果還可以!!!
為了湊夠1萬字,放一下上文用到的LossGenerate函數(shù)
def LossGenerate(name='ce', *args, **kwargs): NAMES = ('ce', 'focal', 'dmi') kwargs = locals()['kwargs'] assert (name in NAMES), ' loss not defined!!!' if name == 'ce': return tf.keras.losses.CategoricalCrossentropy() if name == 'focal': gamma = kwargs.get('gamma', 2.) alpha = kwargs.get('alpha', 0.25) def categorical_focal_loss_fixed(y_true, y_pred): y_pred /= K.sum(y_pred, axis=-1, keepdims=True) epsilon = K.epsilon() y_pred = K.clip(y_pred, epsilon, 1. - epsilon) cross_entropy = -y_true * K.log(y_pred) loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy return K.mean(loss, axis=1) return categorical_focal_loss_fixed if name == 'dmi': def dmi_loss(y_true, y_pred): y_true = tf.transpose(y_true, perm=[1, 0]) mat = tf.matmul(y_true, y_pred) loss = -1.0 * tf.math.log(tf.math.abs(tf.linalg.det(mat)) + 0.001) return loss return dmi_loss 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。