python3spark文本分类_如何用Spark深度集成Tensorflow实现文本分类?
本篇知識點:Tensorflow編程
CNN相關知識
PySpark相關知識
因為例子較為復雜,我們會假設你不但學習了【Tensorflow基礎】,而且還自己主動擴展了TF相關的知識,并且根據里面的推薦,了解了CNN的相關的術語,比如窗口,滑動等。
本小節會附有完整代碼,建議大家手動輸入進自己的編輯器,而不是簡單復制黏貼,加深理解。
依托于前面幾個章節內容的鋪墊,本節會通過SDL ,利用CNN來實現一個文本意圖識別的例子。
假設數據是CSV格式,只有兩列,一列是文本字段,一列是分類字段。他們都是字符串格式。大致格式如下:
最近的醫院在哪里, 醫院
附近哪個餐廳最好吃,餐館
......
還記得我們之前【Tensorflow基礎】提到的套路么?定義你要喂進去數據格式,也就是placeholder
建立網絡結構
定義你要優化的目標值
選擇優化器
定義一些指標衡量結果,比如準確率
打開session會話
執行訓練過程以及驗證結果
關閉會話
這次我們也會按照上面的套路進行。
先定義輸入格式:
input_x = tf.placeholder(tf.int32, [None, 64], name="input_x")input_y = tf.placeholder(tf.float32, [None, 200], name="input_y")
輸入是一個二維數組,第二個維度是64個int類型的數字。第一個維度指的是每次喂給算法的樣本的個數,在運行時確定。
接著我們開始定義網絡結構,工欲善其事必先利其器,我們先開發一個方法,可以很方便的建立卷積層:
def conv_poo_layer(input, size_in, size_out, filter_width, filter_height, include_pool=False, name="conv"):
with tf.name_scope(name):
w = tf.Variable(tf.truncated_normal([filter_height, filter_width, size_in, size_out], stddev=0.1), name="W")
b = tf.Variable(tf.constant(0.1, shape=[size_out], name="B"))
conv = tf.nn.conv2d(input, w, strides=[1, 1, 1, 1], padding="VALID")
act = tf.nn.relu(conv + b)
tf.summary.histogram("weights", w)
tf.summary.histogram("biases", b)
tf.summary.histogram("activations", act) if include_pool: return tf.nn.max_pool(act, ksize=[1, filter_height, 1, 1], strides=[1, 1, 1, 1], padding="VALID") else: return act
有了上面的方法,我們就可以簡單調用conpoolayer就能創建一層卷積網絡了。里面具體的參數大家還是需要自己了解,如果不清楚,可以簡單的把這個當做一個工具類。
通常卷積層之后,要接一個全連接層,方便后續做分類。同樣的,我們開發一個工具方法,方便全連接網絡結構的建立:
def fc_layer(input, size_in, size_out, active="relu", name="fc"):
with tf.name_scope(name):
w = tf.Variable(tf.truncated_normal([size_in, size_out], stddev=0.1), name="W_" + name)
b = tf.Variable(tf.constant(0.1, shape=[size_out], name="B_" + name)) if active == "sigmoid":
act = tf.nn.sigmoid(tf.matmul(input, w) + b) elif active is None:
act = tf.matmul(input, w) + b else:
act = tf.nn.relu(tf.matmul(input, w) + b)
tf.summary.histogram("W_" + name + "_weights", w)
tf.summary.histogram("B_" + name + "_biases", b)
tf.summary.histogram(name + "_activations", act) return act
現在我們可以通過fc_layer方法隨意構建一個全連接層。
為了模擬實際場景,我們打算構建三個兩層的卷積網絡,每個卷積網絡的滑動窗口分別是5,10,20。
embedded_input_x = tf.expand_dims(lookup_embedding(input_x), -1)
buffer = [] for vw in [5, 10, 20]:
conv_layout_num = 0
pool_layout_num = 0
conv1 = conv_poo_layer(embedded_input_x, 1, 16, filter_width=EMBEDDING_DIM, filter_height=vw,
name="conv1_" + str(vw))
conv_layout_num += 1
pool_layout_num += 0
conv_out = conv_poo_layer(conv1, 16, 32, filter_width=1, filter_height=vw,
name="conv2_" + str(vw))
conv_layout_num += 1
pool_layout_num += 0
flattened = tf.reshape(conv_out, [-1, (
SEQUENCE_LENGTH + conv_layout_num + pool_layout_num - conv_layout_num * vw - pool_layout_num * vw) * 32])
buffer.append(flattened)
在上面的代碼中,因為Tensorflow的API比較底層,很多東西都要自己算,所以多加了一些額外的代碼方便計算。事實上,Tensorflow提供了很多高級API,比如集成了Keras等,它已經做了非常多的工作可以很方便的用幾行代碼就創建一個復雜的網絡。因為不在這次文章的討論范疇,所以不展開說明。有興趣讀者可以再去了解。
最后我們會把三個并行的卷積網絡形成的向量進行拼接,得到一個新的向量:
final_flattened = tf.concat(buffer, 1)
現在再接一個三層的全連接神經網絡:
fc1 = fc_layer(final_flattened, int(final_flattened.shape[1]), 1024, "relu", "fc1")fc2 = fc_layer(fc1, 1024, 128, "relu", "fc2")
最后一層使用dropout 正則技術:
dropout_fc1 = tf.nn.dropout(fc2, INITIAL_KEEP_PROB)
_logits = fc_layer(dropout_fc1, 128, NUM_CLASSES, None, "fc3")
因為是一個分類問題,所以我們會使用softmaxcrossentropywithlogits,現在我們有了優化目標:
with tf.name_scope("xent"):
xent = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=_logits, labels=input_y), name="xent"
)
tf.summary.scalar("xent", xent)
選擇優化器:
with tf.name_scope("train"):
learning_rate = tf.train.exponential_decay(INITIAL_LEARNING_RATE, global_step, 1200, 0.8, staircase=True)
train_step = tf.train.AdamOptimizer(learning_rate).minimize(xent, global_step=global_step)
定義效果度量:
with tf.name_scope("accuracy"):
correct_prediction = tf.equal(tf.argmax(_logits, 1), tf.argmax(input_y, 1))
accurate = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
tf.summary.scalar("accuracy", accurate)
最后打開會話訓練:
[train_accuracy, s, loss] = sess.run([accurate, summ, xent],
feed_dict={input_x: X, input_y: Y, keep_prob: 1.})
完整代碼如下:
def text_cnn_map_fun(args={}, ctx=None, _read_data=None):
import tensorflow as tf import sys from sklearn.utils import Bunch
FLAGS = Bunch(**args["params"]["fitParam"])
embedded_vec = FLAGS.word_embedding_bs
NUM_CLASSES = FLAGS.num_classes
BATCHS_SIZE = FLAGS.batch_size
SEQUENCE_LENGTH = FLAGS.sequence_length
EPOCH = FLAGS.epochs
EMBEDDING_DIM = len(embedded_vec[0])
TENSOR_BORAD_DIR = FLAGS.tensor_board_dir
dev_collection = FLAGS.dev_collection
INITIAL_LEARNING_RATE = 0.001
INITIAL_KEEP_PROB = 0.9
def conv_poo_layer(input, size_in, size_out, filter_width, filter_height, include_pool=False, name="conv"):
with tf.name_scope(name):
w = tf.Variable(tf.truncated_normal([filter_height, filter_width, size_in, size_out], stddev=0.1), name="W")
b = tf.Variable(tf.constant(0.1, shape=[size_out], name="B"))
conv = tf.nn.conv2d(input, w, strides=[1, 1, 1, 1], padding="VALID")
act = tf.nn.relu(conv + b)
tf.summary.histogram("weights", w)
tf.summary.histogram("biases", b)
tf.summary.histogram("activations", act) if include_pool: return tf.nn.max_pool(act, ksize=[1, filter_height, 1, 1], strides=[1, 1, 1, 1], padding="VALID") else: return act def fc_layer(input, size_in, size_out, active="relu", name="fc"):
with tf.name_scope(name):
w = tf.Variable(tf.truncated_normal([size_in, size_out], stddev=0.1), name="W_" + name)
b = tf.Variable(tf.constant(0.1, shape=[size_out], name="B_" + name)) if active == "sigmoid":
act = tf.nn.sigmoid(tf.matmul(input, w) + b) elif active is None:
act = tf.matmul(input, w) + b else:
act = tf.nn.relu(tf.matmul(input, w) + b)
tf.summary.histogram("W_" + name + "_weights", w)
tf.summary.histogram("B_" + name + "_biases", b)
tf.summary.histogram(name + "_activations", act) return act
tf.reset_default_graph
sess = tf.Session()
input_x = tf.placeholder(tf.int32, [None, SEQUENCE_LENGTH], name="input_x")
input_y = tf.placeholder(tf.float32, [None, NUM_CLASSES], name="input_y")
global_step = tf.Variable(0, name='global_step', trainable=False)
keep_prob = tf.placeholder(tf.float32, name="keep_prob")
embeddings = tf.Variable(embedded_vec, dtype=tf.float32) def lookup_embedding(word_sequence):
return tf.nn.embedding_lookup(embeddings, word_sequence)
embedded_input_x = tf.expand_dims(lookup_embedding(input_x), -1)
buffer = [] for vw in [5, 10, 20]:
conv_layout_num = 0
pool_layout_num = 0
conv1 = conv_poo_layer(embedded_input_x, 1, 16, filter_width=EMBEDDING_DIM, filter_height=vw,
name="conv1_" + str(vw))
conv_layout_num += 1
pool_layout_num += 0
conv_out = conv_poo_layer(conv1, 16, 32, filter_width=1, filter_height=vw,
name="conv2_" + str(vw))
conv_layout_num += 1
pool_layout_num += 0
flattened = tf.reshape(conv_out, [-1, (
SEQUENCE_LENGTH + conv_layout_num + pool_layout_num - conv_layout_num * vw - pool_layout_num * vw) * 32])
buffer.append(flattened)
final_flattened = tf.concat(buffer, 1)
fc1 = fc_layer(final_flattened, int(final_flattened.shape[1]), 1024, "relu", "fc1")
fc2 = fc_layer(fc1, 1024, 128, "relu", "fc2")
dropout_fc1 = tf.nn.dropout(fc2, INITIAL_KEEP_PROB)
_logits = fc_layer(dropout_fc1, 128, NUM_CLASSES, None, "fc3") with tf.name_scope("xent"):
xent = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=_logits, labels=input_y), name="xent"
)
tf.summary.scalar("xent", xent) with tf.name_scope("train"):
learning_rate = tf.train.exponential_decay(INITIAL_LEARNING_RATE, global_step, 1200, 0.8, staircase=True)
train_step = tf.train.AdamOptimizer(learning_rate).minimize(xent, global_step=global_step) with tf.name_scope("accuracy"):
correct_prediction = tf.equal(tf.argmax(_logits, 1), tf.argmax(input_y, 1))
accurate = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
tf.summary.scalar("accuracy", accurate)
summ = tf.summary.merge_all()
sess.run(tf.global_variables_initializer())
writer = tf.summary.FileWriter(TENSOR_BORAD_DIR)
writer.add_graph(sess.graph)
writer0 = tf.summary.FileWriter(TENSOR_BORAD_DIR + "/0")
writer0.add_graph(sess.graph)
saver = tf.train.Saver()
X_TEST = [item["features"] for item in dev_collection]
Y_TEST = [item["label"] for item in dev_collection] for ep in range(EPOCH):
print("epoch: %d" % ep) for items in _read_data(max_records=BATCHS_SIZE):
X = [item["features"] for item in items]
Y = [item["label"].toArray() for item in items]
_, gs = sess.run([train_step, global_step],
feed_dict={input_x: X, input_y: Y, keep_prob: INITIAL_KEEP_PROB})
[train_accuracy, s, loss] = sess.run([accurate, summ, xent],
feed_dict={input_x: X, input_y: Y, keep_prob: 1.})
[test_accuracy, test_s, test_loss] = sess.run([accurate, summ, xent],
feed_dict={input_x: X_TEST, input_y: Y_TEST, keep_prob: 1.})
print('train_accuracy %g, test_accuracy %g, loss: %g, global step: %d' % (
train_accuracy, test_accuracy, loss, gs))
writer.add_summary(s, gs)
writer0.add_summary(test_s, gs)
sys.stdout.flush()
sess.close()
完整代碼里,我們使用了Dropout正則化技術,使用tf.summary.FileWriter來完成TensorBoard的可視化支持,并且使用了測試集,這樣可以了解算法的泛化能力。另外值得一提的是,所有模型代碼都會包裝在一個函數text_cnn_map_fun里,這個名字你可以隨便取,參數需要則是固定的,只能示例中的三個參數。這個函數會被使用在Spark的 TextEstimator 中。
完成模型代碼,現在我們該寫PySpark代碼了。主要工作有:根據文本生成詞向量,
把文本轉化為數字序列
把類別轉化為one-hot編碼
使用TextEstimator,把前面的模型整合進來
其中1,2,3 SDL已經提供了一個EasyFeature的工具類,可以幾行代碼就完成。
首先創建Spark會話:
session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
為了避免每次訓練預處理數據都需要重算,我們會把已經預處理好的數據放到HDFS/LocalDisk上,為此我們需要定義幾個路徑:
stage_path = "/tmp/text_cnn_ask/stage_train"dev_stage_path = "/tmp/text_cnn_ask/text_cnn_ask_dev"word_embedding_path = "/tmp/text_cnn_ask/text_cnn_ask_embedding"
判定如果路徑不存在,則計算:
if not os.path.exists(stage_path): # 基礎信息獲原始表
# ask_question_section.question,ask_question_section.sectionname
question_ask = session.read.csv( "/Users/allwefantasy/Downloads/part-00000-0a5fbd8f-546d-4a36-88b2-d5f5635cbcaf-c000.csv", encoding="utf-8",
header=True, schema=StructType(
[StructField("question", StringType()), StructField("attention", StringType())]))
df = question_ask.where(f.col("question").isNotNull()).where(f.length("question") > 1)
ef = EasyFeature(outputCol="features", textFields=["question"], excludeFields=["attention"],
stopwords=[" ", "", ",", ","])
df = ef.transform(df)
df = df.drop("features").withColumn("features", f.col("question_text_EasyFeature")).drop( "question_text_EasyFeature")
word_embedding = [item["vector"] for item in ef.getWordEmbedding()]
onehot = CategoricalOneHotTransformer(inputCols=["attention"], outputCols=["label"])
df = onehot.transform(df)
train, dev = df.randomSplit(weights=[0.999, 0.001])
train.write.mode("overwrite").format("parquet").save(stage_path)
dev.write.mode("overwrite").format("parquet").save(dev_stage_path)
pickle.dump(word_embedding, open(word_embedding_path, "wb"))
這段代碼,首先讀取csv文件并且轉化為DataFrame,接著過濾掉question是null或者字符數小于等于1的內容。接著使用EasyFeature完成特征工程相關的工作。對于文本字段,EasyFeature自動生成的列名為question_text_EasyFeature,我們需要改個名字。然后獲取詞向量數組,把attention字段轉化為label字段。最后對數據進行切分,分成訓練集和測試集。
現在我們可以獲取訓練集和測試集了:
train = session.read.format("parquet").load(stage_path)
dev = session.read.format("parquet").load(dev_stage_path)
dev_collection = dev.collect()
num_classes = train.union(dev).select("label").distinct().count()print("num_classes:{},train: {},dev_collection:{},".format(num_classes, train.count(), dev.count()))word_embedding = pickle.load(open(word_embedding_path, "rb"))
word2v_mapping_br = session.sparkContext.broadcast(word_embedding)
estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", "group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 10, "batch_size": 32, "sequence_length": 64, "num_classes": num_classes, "word_embedding_bs": word2v_mapping_br.value, "tensor_board_dir": "./tb_dir6", "dev_collection": dev_collection}],
mapFnParam=text_cnn_map_fun)
estimator.fit(train).collect()
這里,我們獲得分類數目,并且把詞向量和測試機廣播出去,最后通過參數fitParam傳遞給模型。
完整代碼如下:
# -*- coding: UTF-8 -*-import osfrom pyspark.sql import SparkSessionfrom sparkdl import TextEstimatorfrom sparkdl.transformers.easy_feature import EasyFeaturefrom pyspark.sql.types import *import pyspark.sql.functions as ffrom sparkdl.transformers.tf_text import CategoricalOneHotTransformerimport picklefrom text_cnn import text_cnn_map_fun
session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
stage_path = "/tmp/text_cnn_ask/stage_train"dev_stage_path = "/tmp/text_cnn_ask/text_cnn_ask_dev"word_embedding_path = "/tmp/text_cnn_ask/text_cnn_ask_embedding"if not os.path.exists(stage_path): # 基礎信息獲原始表
# ask_question_section.question,ask_question_section.sectionname
question_ask = session.read.csv( "/Users/allwefantasy/Downloads/part-00000-0a5fbd8f-546d-4a36-88b2-d5f5635cbcaf-c000.csv", encoding="utf-8",
header=True, schema=StructType(
[StructField("question", StringType()), StructField("attention", StringType())])) # question_ask.groupby("section").agg(f.count("question").alias("q_num")).orderBy("q_num",ascending=False).show(200)
df = question_ask.where(f.col("question").isNotNull()).where(f.length("question") > 10)
ef = EasyFeature(outputCol="features", textFields=["question"], excludeFields=["attention"],
stopwords=[" ", "", ",", ","])
df = ef.transform(df)
df = df.drop("features").withColumn("features", f.col("question_text_EasyFeature")).drop( "question_text_EasyFeature")
word_embedding = [item["vector"] for item in ef.getWordEmbedding()]
onehot = CategoricalOneHotTransformer(inputCols=["attention"], outputCols=["label"])
df = onehot.transform(df)
train, dev = df.randomSplit(weights=[0.999, 0.001])
train.write.mode("overwrite").format("parquet").save(stage_path)
dev.write.mode("overwrite").format("parquet").save(dev_stage_path)
pickle.dump(word_embedding, open(word_embedding_path, "wb"))
train = session.read.format("parquet").load(stage_path)
dev = session.read.format("parquet").load(dev_stage_path)
dev_collection = dev.collect()
num_classes = train.union(dev).select("label").distinct().count()
print("num_classes:{},train: {},dev_collection:{},".format(num_classes, train.count(), dev.count()))
word_embedding = pickle.load(open(word_embedding_path, "rb"))
word2v_mapping_br = session.sparkContext.broadcast(word_embedding)
estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", "group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 10, "batch_size": 32, "sequence_length": 64, "num_classes": num_classes, "word_embedding_bs": word2v_mapping_br.value, "tensor_board_dir": "./tb_dir6", "dev_collection": dev_collection}],
mapFnParam=text_cnn_map_fun)
estimator.fit(train).collect()
最后如何運行代碼呢?假設Spark主程序名稱為a.py, tensorflow模型代碼為b.py,最后運行方式為:
export JAR="....../target/scala-2.11/spark-deep-learning-assembly-0.2.0-spark2.2.jar"export PYTHONIOENCODING=utf8;./bin/spark-submit \
--driver-memory 12g \
--py-files $JAR,$PY_HOME/b.py \
--jars $JAR \
--master "local[*]" $PY_HOME/a.py今日抄書:
想到了馮驥才的一句話,“大風可以吹起一張白紙,卻無法吹走一只蝴蝶,因為生命的力量在于不順從。”
鄭重求贊
總結
以上是生活随笔為你收集整理的python3spark文本分类_如何用Spark深度集成Tensorflow实现文本分类?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二叉树前序中序后序_leetcode88
- 下一篇: python 等值线 标注 间距、控制_