Modbus Programming Python Examples

Programming Modbus Clients: Python Examples

Imagine you’re building a smart home system where your Python application needs to communicate with temperature sensors, relays, and other devices. Modbus is the perfect protocol for this, and Python’s pymodbus library makes it easy to implement.

In this tutorial, we’ll explore hands-on examples for programming Modbus clients using pymodbus. We’ll cover both synchronous and asynchronous approaches, with complete runnable code snippets for reading holding registers, writing coils, and handling errors.

Introduction to pymodbus

pymodbus is a full-featured Modbus library for Python that supports:

  • Cross-platform compatibility

It’s the go-to library for Python developers working with Modbus devices.

Installation

First, let’s install pymodbus. Open your terminal and run:

“`bash

pip install pymodbus

“`

Prerequisites

For this tutorial, you’ll need:

  • Basic Python programming knowledge

Synchronous Modbus Client

Synchronous clients are simpler for beginners—they execute operations one after another, waiting for each to complete before moving to the next.

Example 1: Reading Holding Registers

Holding registers are the most common data type in Modbus, used for storing numerical values (e.g., temperature, pressure, counters).

“`python

save as sync_read_holding_registers.py

from pymodbus.client import ModbusTcpClient

from pymodbus.exceptions import ModbusException

Modbus server parameters

SERVER_HOST = “127.0.0.1” # Replace with your server IP

SERVER_PORT = 502

SLAVE_ID = 1

Register parameters

START_ADDRESS = 0

REGISTERS_COUNT = 3

Create Modbus TCP client

client = ModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)

try:

# Connect to the server

client.connect()

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Read holding registers

response = client.read_holding_registers(

address=START_ADDRESS,

count=REGISTERS_COUNT,

slave=SLAVE_ID

)

# Check if response is valid

if not response.isError():

# Extract and print the register values

print(f”\nRead {REGISTERS_COUNT} holding registers from address {START_ADDRESS}:”)

for i, value in enumerate(response.registers):

print(f”Register {START_ADDRESS + i}: {value}”)

else:

print(f”Error reading registers: {response}”)

except ModbusException as e:

print(f”Modbus error: {e}”)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

client.close()

print(f”\nDisconnected from Modbus server”)

“`

Example 2: Writing Coils

Coils are single-bit outputs used for controlling devices like relays, motors, and LEDs (ON/OFF state).

“`python

save as sync_write_coils.py

from pymodbus.client import ModbusTcpClient

from pymodbus.exceptions import ModbusException

Modbus server parameters

SERVER_HOST = “127.0.0.1”

SERVER_PORT = 502

SLAVE_ID = 1

Coil parameters

COIL_ADDRESS = 0 # Coil to write to

COIL_VALUE = True # True = ON, False = OFF

Create Modbus TCP client

client = ModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)

try:

# Connect to the server

client.connect()

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Write to a single coil

print(f”\nWriting {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}…”)

response = client.write_coil(

address=COIL_ADDRESS,

value=COIL_VALUE,

slave=SLAVE_ID

)

# Check if write was successful

if not response.isError():

print(f”✓ Successfully wrote {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}”)

# Read back to verify

read_response = client.read_coils(

address=COIL_ADDRESS,

count=1,

slave=SLAVE_ID

)

if not read_response.isError():

actual_value = bool(read_response.bits[0])

print(f”✓ Verification: Coil {COIL_ADDRESS} is now {‘ON’ if actual_value else ‘OFF’}”)

else:

print(f”✗ Error writing coil: {response}”)

except ModbusException as e:

print(f”Modbus error: {e}”)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

client.close()

print(f”\nDisconnected from Modbus server”)

“`

Example 3: Advanced Error Handling

Proper error handling is essential for robust Modbus applications. Let’s extend our example with more detailed error handling:

“`python

save as sync_error_handling.py

from pymodbus.client import ModbusTcpClient

from pymodbus.exceptions import (

ModbusException,

ConnectionException,

TimeoutException

)

Modbus server parameters

SERVER_HOST = “127.0.0.1”

SERVER_PORT = 502

SLAVE_ID = 1

Register parameters

START_ADDRESS = 0

REGISTERS_COUNT = 3

Create Modbus TCP client with timeout settings

client = ModbusTcpClient(

host=SERVER_HOST,

port=SERVER_PORT,

timeout=2 # 2 seconds timeout

)

def handle_modbus_error(e, operation):

“””Helper function to handle different Modbus error types”””

if isinstance(e, ConnectionException):

return f”Connection failed for {operation}: {e}”

elif isinstance(e, TimeoutException):

return f”Timeout for {operation}: The server didn’t respond within the timeout period”

elif isinstance(e, ModbusException):

return f”Modbus error for {operation}: {e}”

else:

return f”Unexpected error for {operation}: {e}”

try:

# Connect to the server

if client.connect():

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Read holding registers with retries (simple retry mechanism)

max_retries = 3

retry_count = 0

success = False

while retry_count < max_retries and not success:

try:

response = client.read_holding_registers(

address=START_ADDRESS,

count=REGISTERS_COUNT,

slave=SLAVE_ID

)

if not response.isError():

print(f”\nRead {REGISTERS_COUNT} holding registers successfully:”)

for i, value in enumerate(response.registers):

print(f”Register {START_ADDRESS + i}: {value}”)

success = True

else:

print(f”Retry {retry_count + 1}/{max_retries}: Error reading registers: {response}”)

retry_count += 1

except Exception as e:

error_msg = handle_modbus_error(e, “reading registers”)

print(f”Retry {retry_count + 1}/{max_retries}: {error_msg}”)

retry_count += 1

if not success:

print(f”Failed to read registers after {max_retries} attempts”)

else:

print(f”Failed to connect to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

client.close()

print(f”\nDisconnected from Modbus server”)

“`

Asynchronous Modbus Client

Asynchronous clients are more efficient for applications that need to handle multiple connections or perform other tasks while waiting for Modbus responses. They use Python’s `asyncio` library.

Example 4: Async Reading Holding Registers

“`python

save as async_read_holding_registers.py

import asyncio

from pymodbus.client import AsyncModbusTcpClient

from pymodbus.exceptions import ModbusException

Modbus server parameters

SERVER_HOST = “127.0.0.1”

SERVER_PORT = 502

SLAVE_ID = 1

Register parameters

START_ADDRESS = 0

REGISTERS_COUNT = 3

async def async_modbus_client():

“””Async function to read holding registers”””

# Create async Modbus TCP client

client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)

try:

# Connect to the server

await client.connect()

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Read holding registers asynchronously

response = await client.read_holding_registers(

address=START_ADDRESS,

count=REGISTERS_COUNT,

slave=SLAVE_ID

)

# Check if response is valid

if not response.isError():

# Extract and print the register values

print(f”\nRead {REGISTERS_COUNT} holding registers from address {START_ADDRESS}:”)

for i, value in enumerate(response.registers):

print(f”Register {START_ADDRESS + i}: {value}”)

else:

print(f”Error reading registers: {response}”)

except ModbusException as e:

print(f”Modbus error: {e}”)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

await client.close()

print(f”\nDisconnected from Modbus server”)

Run the async function

if __name__ == “__main__”:

asyncio.run(async_modbus_client())

“`

Example 5: Async Writing Coils

“`python

save as async_write_coils.py

import asyncio

from pymodbus.client import AsyncModbusTcpClient

from pymodbus.exceptions import ModbusException

Modbus server parameters

SERVER_HOST = “127.0.0.1”

SERVER_PORT = 502

SLAVE_ID = 1

Coil parameters

COIL_ADDRESS = 0

COIL_VALUE = True # True = ON, False = OFF

async def async_modbus_client():

“””Async function to write coils”””

# Create async Modbus TCP client

client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)

try:

# Connect to the server

await client.connect()

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Write to a single coil

print(f”\nWriting {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}…”)

response = await client.write_coil(

address=COIL_ADDRESS,

value=COIL_VALUE,

slave=SLAVE_ID

)

# Check if write was successful

if not response.isError():

print(f”✓ Successfully wrote {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}”)

# Read back to verify

read_response = await client.read_coils(

address=COIL_ADDRESS,

count=1,

slave=SLAVE_ID

)

if not read_response.isError():

actual_value = bool(read_response.bits[0])

print(f”✓ Verification: Coil {COIL_ADDRESS} is now {‘ON’ if actual_value else ‘OFF’}”)

else:

print(f”✗ Error writing coil: {response}”)

except ModbusException as e:

print(f”Modbus error: {e}”)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

await client.close()

print(f”\nDisconnected from Modbus server”)

Run the async function

if __name__ == “__main__”:

asyncio.run(async_modbus_client())

“`

Example 6: Async Multiple Device Communication

One of the main advantages of async is efficiently communicating with multiple devices:

“`python

save as async_multiple_devices.py

import asyncio

from pymodbus.client import AsyncModbusTcpClient

from pymodbus.exceptions import ModbusException

Server parameters

SERVER_HOST = “127.0.0.1”

SERVER_PORT = 502

Device configurations (slave IDs and their registers)

devices = [

{“slave_id”: 1, “reg_start”: 0, “reg_count”: 2, “name”: “Temperature Sensor”},

{“slave_id”: 2, “reg_start”: 0, “reg_count”: 2, “name”: “Pressure Sensor”},

{“slave_id”: 3, “reg_start”: 0, “reg_count”: 2, “name”: “Flow Meter”}

]

async def read_device(client, device):

“””Async function to read a single device”””

try:

response = await client.read_holding_registers(

address=device[“reg_start”],

count=device[“reg_count”],

slave=device[“slave_id”]

)

if not response.isError():

print(f”\n{device[‘name’]} (Slave {device[‘slave_id’]}):”)

for i, value in enumerate(response.registers):

print(f” Register {device[‘reg_start’] + i}: {value}”)

else:

print(f”\nError reading {device[‘name’]} (Slave {device[‘slave_id’]}): {response}”)

except Exception as e:

print(f”\nException reading {device[‘name’]} (Slave {device[‘slave_id’]}): {e}”)

async def async_modbus_client():

“””Async function to communicate with multiple devices”””

# Create async Modbus TCP client

client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)

try:

# Connect to the server

await client.connect()

print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)

# Read all devices concurrently

print(“\nReading all devices concurrently…”)

# Create tasks for each device read operation

tasks = [read_device(client, device) for device in devices]

# Wait for all tasks to complete

await asyncio.gather(*tasks)

except Exception as e:

print(f”General error: {e}”)

finally:

# Close the connection

await client.close()

print(f”\nDisconnected from Modbus server”)

Run the async function

if __name__ == “__main__”:

asyncio.run(async_modbus_client())

“`

Synchronous vs Asynchronous: Which to Choose?

Let’s compare the two approaches to help you decide which one to use:

| Feature | Synchronous | Asynchronous |

|————-|—————-|——————|

| Complexity | Simple, easy to understand | More complex, requires async/await knowledge |

| Performance | Blocks execution during I/O | Non-blocking, efficient for multiple connections |

| Use Case | Simple applications, single device communication | Complex applications, multiple devices, real-time systems |

| Code Structure | Linear, procedural | Event-driven, concurrent |

| Scalability | Limited by blocking calls | Highly scalable, handles many connections |

| Error Handling | Traditional try-except | Same try-except with async considerations |

When to Use Synchronous

  • Scripting: Short-lived scripts that don’t require concurrency

When to Use Asynchronous

  • High-performance Systems: Applications requiring maximum throughput

Practical Tips for pymodbus

1. Always Use Timeouts: Set appropriate timeouts to prevent your application from hanging

2. Implement Retry Logic: Network issues happen—add simple retry mechanisms

3. Validate Responses: Always check if responses are errors before accessing data

4. Use Logging: pymodbus has built-in logging—enable it for debugging:

“`python

import logging

logging.basicConfig()

logging.getLogger().setLevel(logging.DEBUG)

“`

5. Test with Simulators: Use Modbus simulators (like Simply Modbus) for development

6. Handle Big/Little Endian: Remember that Modbus uses big-endian by default

7. Close Connections Properly: Always close connections in a finally block

Running the Examples

To run these examples:

1. Start a Modbus Server: Use Simply Modbus TCP Server, pymodbus server, or a real Modbus device

2. Install pymodbus: `pip install pymodbus`

3. Save the Code: Save each example as a .py file

4. Configure Parameters: Update SERVER_HOST, SERVER_PORT, and SLAVE_ID as needed

5. Run the Script: `python script_name.py`

Conclusion

Python’s pymodbus library provides powerful tools for programming Modbus clients. Whether you choose the simple synchronous approach for basic applications or the more efficient asynchronous approach for complex systems, pymodbus has you covered.

Key takeaways:

  • Complete Examples: All code snippets are runnable with minimal modifications

With these examples, you’re well-equipped to start building your own Modbus client applications in Python. Remember to practice in a safe test environment (see our previous article) before deploying to production systems.

Happy coding!