1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
import logging from socketserver import TCPServer from collections import defaultdict
from umodbus import conf from umodbus.server.tcp import RequestHandler, get_server from umodbus.utils import log_to_stream
log_to_stream(level=logging.DEBUG)
data_store = defaultdict(int)
conf.SIGNED_VALUES = True
TCPServer.allow_reuse_address = True app = get_server(TCPServer, ('localhost', 502), RequestHandler)
def (slave_id, function_code, address): """" Return value of address. """ return data_store[address]
@app.route(slave_ids=[1], function_codes=[5, 15], addresses=list(range(0, 10))) def write_data_store(slave_id, function_code, address, value): """" Set value for address. """ data_store[address] = value
if __name__ == '__main__': try: app.serve_forever() finally: app.shutdown() app.server_close()
|
近期评论