-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlisten.py
61 lines (51 loc) · 1.37 KB
/
listen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import pika
import json
import sys
import os
from pymongo import MongoClient
from pydantic import (BaseModel, PositiveInt)
import hashlib
from typing import Optional
cli = MongoClient('localhost', 27017)
db = cli.yat
txs = db.tx2
class Tx(BaseModel):
credit: str
debit: str
amount: PositiveInt
time: int
sign: str
hash: Optional[str]
msg: Optional[str]
#type
def tob2b(t):
h = hashlib.blake2b()
h.update(bytes(t.encode("utf-8")))
return h.digest().hex()
def on_message(ch, method, properties, tx):
tx = tx.decode("utf-8")
tx = json.loads(tx)
tx = Tx(**tx)
m = tx.credit + tx.debit + str(tx.amount) + str(tx.msg) + str(tx.time)
this_hash = tob2b(m)
last = txs.find_one({'$query': {}, '$orderby': {'_id': -1}})
last_hash = last['hash']
tx.hash = tob2b(last_hash + this_hash)
result = txs.insert_one(dict(tx))
if result.inserted_id:
print(dict(tx))
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.basic_consume('tx', on_message)
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)