python123平台作业答案第十二周_python周报第十二周
0.本周知識點預覽
Contextlib
Redis發布訂閱
RabbitMQ
pymysql
SQLAchemy
1.Contextlib模塊
contextlib模塊的contextmanager 可以實現用with來管理上下文,類似于 with open('test.txt','r') as f,這樣打開文件操作后就可以自動關閉文件。
1.范例一(自定義函數):
###contextlib (實現with上下文管理)
importcontextlib
list1= [1,2,3]
str1= "lk"###此處必須是這個裝飾器
@contextlib.contextmanagerdeffunc(l, s):
l.append(s)try:###執行到yield時,中斷跳出函數
yield
finally:print(l)
with func(list1, str1):print(123)print(456)
執行結果如下:
123
456[1, 2, 3, 'lk']
代碼解析:以上代碼的執行順序為:
1.加載list1,str1,contextlib.contextmanager裝飾器
2.執行with func(list1, str1)
3.執行def func(l, s), l.append(s)
4.try,yield,跳出函數回到with func(list1, str1) 執行print
5.執行完print后回到def func中的yield處,繼續往下執行
2.范例二(socket):
###利用上下文可以處理文件那樣處理類似socket,自動關閉連接
importcontextlibimportsocket
@contextlib.contextmanagerdefbase_socket(host, port):
sk=socket.socket()
sk.bind((host, port))
sk.listen(5)print(123)try:yieldskfinally:print(789)
sk.close()
with base_socket("127.0.0.1", 8888) as sock:print(456)
執行結果如下:
123
456
789
代碼解析:這個是context對socket的應用,在項目中就可以這么寫。執行順序和上個例子相同。
2.Redis 發布訂閱
自定義redis基礎類:
## redis 發布訂閱
importredisclassRedisHelper:def __init__(self):###創建redis連接對象
self.__conn = redis.Redis(host="127.0.0.1", port=6379)defpublic(self, msg, chan):###publish 方法,把信息發布到頻道上,返回消息被傳遞的訂閱者的數量
self.__conn.publish(chan, msg)returnTruedefsubscribe(self, chan):###pubsub 方法的意思是,返回一個發布或者訂閱的對象,用這個對象,你就能訂閱這個頻道,監聽給發給這些頻道的消息
pub = self.__conn.pubsub()###subscribe 方法: 訂閱頻道
pub.subscribe(chan)###parse_response 方法:解析從發布者/訂閱者命令的響應
pub.parse_response()return pub
發布者代碼:
importtest
fabu=test.RedisHelper()whileTrue:
inp= input(">>>")if inp == "exit":break
else:
fabu.public("%s" % inp, "998")
訂閱者代碼:
importtest
dingyue=test.RedisHelper()whileTrue:###subscribe 方法 -> 訂閱頻道
data = dingyue.subscribe("998")###parse_response 方法:解析從發布者/訂閱者命令的響應
print(data.parse_response())
執行結果:
1、先執行訂閱者代碼,這時會卡在這里,等待接收消息。
2、后執行發布者代碼,這時當發布消息時,訂閱者就會收到消息。
3.RabbitMQ
1.未利用exchange
生產者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))
channel=connection.channel()###創建一個隊列,假如隊列不存在,但是為了確保隊列成功創建,C/P兩端最好都創建隊列###durable=True 消息持久化
channel.queue_declare("hello_lk3", durable=True)###在Rabbitmq中,一個消息不能直接發送給queue, 需要經過一個exchange,后續會講到 ,現在我們只需將exchange設置為空字符串
channel.basic_publish(exchange='', routing_key="hello_lk1", body="fuck", properties=pika.BasicProperties(delivery_mode=2,))print("[x] sent 'fuck'")
connection.close()
消費者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創建一個頻道
channel =connection.channel()###創建一個隊列,假如隊列不存在,但是為了確保隊列成功創建,C/P兩端最好都創建隊列###durable=True 消息持久化
channel.queue_declare("hello_lk3", durable=True)###函數名不必須叫callback,callback函數就是將接收到的消息打印在屏幕上
defcallback(ch, mechod, properties, body):print("[%s] is received" %body)###無限循環監聽,調用callback,隊列名,no_ack的含義為,當時True時,只要訂閱到消息,立刻返回ack,這是TCP層面的,并不能確保消息成功消費###假如no_ack為False時,訂閱到消息后要處理成功后才返回ack,這是業務邏輯層面的,確保消費者成功消費消息
channel.basic_consume(callback, queue="hello_lk1", no_ack=True)print("現在開始消費消息...")
channel.start_consuming()
代碼執行結果:
生產者:
[x] sent 'fuck'Process finished with exit code 0
消費者:
現在開始消費消息...
[b'fuck'] isreceived
[b'fuck'] isreceived
[b'fuck'] isreceived
[b'fuck'] is received
代碼解析:
1.先執行消費者代碼,在執行生產者代碼,可以看到如上圖所示結果。
2.執行兩次消費者代碼,會發現生產者每生產個消息,消費者會輪訓的來消費。
3.在步驟2中,假如不想讓消費者輪訓消費而是先來先得的消費,則需要在消費者代碼中加入一行:channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列。
2.使用exchange發布訂閱(常用)
1.fanout exchange
這是處理邏輯最簡單的exchange類型,實際上它沒有任何邏輯,它把進入該exchange的消息全部轉發給每一個綁定的隊列中,如果這個exchange沒有隊列與之綁定,消息會被丟棄。然后通過exchange發送消息,routing key可以隨便填寫,因為是fanout類型的exchange,routing key不起作用。
生產者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))
channel=connection.channel()###創建一個exchange,生產者直接向exchange發消息,而不是隊列.###type: fanout類型:把進入該exchange的消息全部轉發給每一個綁定的隊列中,如果這個exchange沒有隊列與之綁定,消息會被丟棄
channel.exchange_declare(exchange="lk", type="fanout")
message= 'hello'channel.basic_publish(exchange='lk', routing_key='', body=message)print("send MSG: %s" %message)
connection.close()
消費者1代碼:
##exchange 版
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創建一個頻道
channel =connection.channel()###創建一個exchange,假如不存在,但是為了確保其成功創建,C/P兩端最好都創建exchange###lk 為exchange名###type: fanout類型:把進入該exchange的消息全部轉發給每一個綁定的隊列中,如果這個exchange沒有隊列與之綁定,消息會被丟棄
channel.exchange_declare(exchange="lk", type='fanout')##隨機創建隊列#result = channel.queue_declare(exclusive=True)#queue_name = result.method.queue###指定創建隊列
channel.queue_declare("hello_lk5")##綁定隊列到exchange
channel.queue_bind(exchange="lk", queue="hello_lk5")###函數名不必須叫callback,callback函數就是將接收到的消息打印在屏幕上
defcallback1(ch, method, propreties, body):print("[x] 收到 %s" %body)
channel.basic_consume(callback1, queue="hello_lk5", no_ack=True)
channel.start_consuming()
消費者2代碼:
##exchange 版
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創建一個頻道
channel =connection.channel()###創建一個exchange,假如不存在,但是為了確保其成功創建,C/P兩端最好都創建exchange###lk 為exchange名###type: fanout類型:把進入該exchange的消息全部轉發給每一個綁定的隊列中,如果這個exchange沒有隊列與之綁定,消息會被丟棄
channel.exchange_declare(exchange="lk", type='fanout')##隨機創建隊列#result = channel.queue_declare(exclusive=True)#queue_name = result.method.queue###指定創建隊列
channel.queue_declare("hello_lk6")##綁定隊列到exchange
channel.queue_bind(exchange="lk", queue="hello_lk6")###函數名必須叫callback,callback函數就是將接收到的消息打印在屏幕上
defcallback1(ch, method, propreties, body):print("[x] 收到 %s" %body)
channel.basic_consume(callback1, queue="hello_lk6", no_ack=True)
channel.start_consuming()
執行結果:
生產者:
send MSG: hello
Process finished with exit code 0
消費者1:
[x] 收到 b'hello'
消費者2:
[x] 收到 b'hello'
代碼解析:生產者直接向exchange發消息,這時,消費者創建并綁定隊列到exchange上,生產者一旦發布,所有隊列都會收到消息。
2.direct exchange
這種類型的交換機Fancout 類型的交換機智能一些,它會根據routing key來決定把消息具體扔到哪個消息隊列中。通過exchange發消息的時候會指定一個routing key,只有當routing key和與隊列綁定的routing key一樣的時候,消息才對發送到對應的消息隊列。即,如果與某個隊列綁定的routing key叫hello.world,則通過exchange發送的routing key必須也是hello.world,該隊列才能接收到消息(可按上述步驟進行驗證)。這種情況下,隊列之間是互斥關系,一個消息最多只能進入一個隊列。
生產者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創建一個exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###定義關鍵字severity, 每次發消息時會指定關鍵字.
severity = "error"
###message 是要發送的消息
message = "123"
###通過定義好的exchange發送消息,關鍵字也是定好的,發送指定的消息
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)print("[x] Sent %r:%r" %(severity, message))
connection.close()
消費者1代碼:
importpika#import sys
###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))###創建一個頻道
channel =connection.channel()###創建一個exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###隨機創建一個隊列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###關聯關鍵字,只要生產者發布的消息關聯了以下關鍵字,訂閱者便能在綁定的隊列中收到消息
severities = ["info", "waring", "error"]###綁定關鍵字,隊列到exchange.
for severity inseverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)print('[*] Waiting for logs. To exit press CTRL+C')###打印訂閱到的消息
defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))###循環監聽隊列
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
消費者2代碼:
importpika#import sys
###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))###創建一個頻道
channel =connection.channel()###創建一個exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###隨機創建一個隊列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###關聯關鍵字,只要生產者發布的消息關聯了以下關鍵字,訂閱者便能在綁定的隊列中收到消息
severities = ["error"]###綁定關鍵字,隊列到exchange.
for severity inseverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)print('[*] Waiting for logs. To exit press CTRL+C')###打印訂閱到的消息
defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))###循環監聽隊列
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執行結果:
1.首先執行消費者1和消費者2的代碼
2.然后執行生產者的代碼
3.可以看到消費者1、2都接收到了生產者的消息。
4.假如生產者發送帶有非"error"關鍵字的消息,則只有消費者1才能收到。
消費者1執行結果:
[*] Waiting for logs. To exit press CTRL+C
[x]'error':b'123'[x]'info':b'123'
消費者2執行結果:
[*] Waiting for logs. To exit press CTRL+C
[x]'error':b'123'
3.Topic exchange
Topic exchange是最靈活的exchange,它會把exchange的routing key與綁定隊列的routing key進行模式匹配。Routing key中可以包含 和#兩種符號,#號可以用來匹配一個或者多個單詞,*用來匹配正好一個單詞。
生產者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創建一個exchange, 類型是topic
channel.exchange_declare(exchange='topic_logs',
type='topic')###發布者綁定exchange的關鍵字,訂閱者根據模糊匹配來訂閱
routing_key = "lk.haha.python"message= "xxoo"channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)print("[x] Sent %r:%r" %(routing_key, message))
connection.close()
消費者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創建一個exchange, 類型是topic
channel.exchange_declare(exchange='topic_logs',
type='topic')###隨機創建一個隊列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###訂閱者綁定在exchange以及隊列的關鍵字模糊匹配,這里#代表0個或多個單詞,* 代表一個單詞,假如只寫一個#,代表全部匹配.
binding_keys = ["lk.#"]###根據多種匹配來綁定
for binding_key inbinding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執行結果:
1.先執行消費者代碼,后執行生產者代碼。
2.因為生產者發送的消息帶有關鍵字lk.haha.python,符合訂閱者的綁定邏輯lk.#,所以這個消費者能收到消息。
3.這個用法很方便,可以通過匹配來進行消息的選擇接收。
4.Python的SQLAchemy框架
1.MySQL 基礎 ----> 請自行百度
2.SQLAchemy
1.pymysql
importpymysql###創建一個MySQL 連接對象
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='s13')###創建一個可以操作MySQL的游標,默認獲取結果是元組,當設置cursor=pymysql.cursors.DictCursor,后獲取結果為字典
cursor =conn.cursor()#cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
###執行SQL語句
cursor.execute("select * from t10")#獲取第一行數據#row_1 = cursor.fetchone()
#獲取前n行數據#row_2 = cursor.fetchmany(3)#獲取所有數據
row_3 =cursor.fetchall()print(row_3)###mode='relative',獲取結果相對位置移動 mode= 'absolute',獲取結果絕對位置移動#cursor.scroll(-2,mode='relative')#row_3 = cursor.fetchall()#print(row_3)
###提交操作,當執行如insert update alter delete drop 等操作后要提交才能生效
conn.commit()###關閉游標
cursor.close()###關閉數據庫連接
conn.close()
執行結果:就是數據庫的操作結果,不過,庫和表都是事先從終端創建好的。
2.SQLAchemy基本操作
from sqlalchemy.ext.declarative importdeclarative_basefrom sqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm importsessionmaker, relationshipfrom sqlalchemy importcreate_engine###創建一個數據庫連接,連接池為5個
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)###創建對象的基類,默認就這么寫.
Base =declarative_base()#定義個User子類
classUsers(Base):###要創建的表名
__tablename__ = 'users'
###表的結構
id = Column(Integer, primary_key=True)
name= Column(String(32))
extra= Column(String(16))__table_args__ =(
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_id_name', 'name', 'extra'),
)#一對多
classFavor(Base):__tablename__ = 'favor'nid= Column(Integer, primary_key=True)
caption= Column(String(50), default='red', unique=True)classPerson(Base):__tablename__ = 'person'nid= Column(Integer, primary_key=True)
name= Column(String(32), index=True, nullable=True)
favor_id= Column(Integer, ForeignKey("favor.nid"))#多對多
classServerToGroup(Base):__tablename__ = 'servertogroup'nid= Column(Integer, primary_key=True, autoincrement=True)
server_id= Column(Integer, ForeignKey('server.id'))
group_id= Column(Integer, ForeignKey('group.id'))classGroup(Base):__tablename__ = 'group'id= Column(Integer, primary_key=True)
name= Column(String(64), unique=True, nullable=False)classServer(Base):__tablename__ = 'server'id= Column(Integer, primary_key=True, autoincrement=True)
hostname= Column(String(64), unique=True, nullable=False)
port= Column(Integer, default=22)###執行建表操作(create_all),刪表操作(drop_all)
Base.metadata.create_all(engine)#Base.metadata.drop_all(engine)
執行結果如下:
mysql>show tables;+---------------+
| Tables_in_s13 |
+---------------+
| favor |
| group |
| person |
| server |
| servertogroup |
| t10 |
| users |
+---------------+
7 rows in set (0.01 sec)
3.SQLAchemy 增刪改查
1.利用數據庫連接直接SQL語句執行
fromsqlalchemy import create_engine
###創建一個數據庫連接,連接池為5個
engine= create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
engine.execute(
"INSERT INTO users (id, name, extra) VALUES (1, 'lk', 'haha')"
)
result= engine.execute('select * from users')print(result.fetchall())
執行結果:
[(1, 'lk', 'haha')]
2.利用SQLAchemy內部組件操作
在利用SQLAchemy的子類繼承模式創建表之后,創建對象,利用對象執行特定語句。
增:
obj = Users(name="alex0", extra='sb')
session.add(obj)
session.add_all([Users(name="alex1", extra='sb'),
Users(name="alex2", extra='sb'),])
session.commit()
刪:
session.query(Users).filter(Users.id > 2).delete()
session.commit()
改:
session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id> 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id> 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()
查:
ret = session.query(Users).all()
ret= session.query(Users.name, Users.extra).all()
ret= session.query(Users).filter_by(name='alex').all()
ret= session.query(Users).filter_by(name='alex').first()
其他:
# 條件
ret= session.query(Users).filter_by(name='alex').all()
ret= session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret= session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret= session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret= session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret= session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()fromsqlalchemy import and_, or_
ret= session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret= session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret=session.query(Users).filter(
or_(
Users.id< 2,
and_(Users.name== 'eric', Users.id > 3),
Users.extra!=""
)).all()
# 通配符
ret= session.query(Users).filter(Users.name.like('e%')).all()
ret= session.query(Users).filter(~Users.name.like('e%')).all()
# 限制
ret= session.query(Users)[1:2]# 排序
ret= session.query(Users).order_by(Users.name.desc()).all()
ret= session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分組fromsqlalchemy.sql import func
ret= session.query(Users).group_by(Users.extra).all()
ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 連表
ret= session.query(Users, Favor).filter(Users.id == Favor.nid).all()
ret= session.query(Person).join(Favor).all()
ret= session.query(Person).join(Favor, isouter=True).all()
# 組合
q1= session.query(Users.name).filter(Users.id > 2)
q2= session.query(Favor.caption).filter(Favor.nid < 2)
ret= q1.union(q2).all()
q1= session.query(Users.name).filter(Users.id > 2)
q2= session.query(Favor.caption).filter(Favor.nid < 2)
ret= q1.union_all(q2).all()
總結
以上是生活随笔為你收集整理的python123平台作业答案第十二周_python周报第十二周的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 腾讯地图利用绘图工具测量距离
 - 下一篇: 使用 RGBA4444 與 Dither