PstRotator and SATNOGs

I was able to control my 2m/70cm rotatable beam from SatNOGS using PstRotator software and custom protocol proxy written in python. The proxy is an ugly hack but it works for me, so hope it will be useful to someone. The PstRotator software supports a wast amount of rotors, it would be beneficial to use it in SatNOGS. Unfortunately at this moment PstRotator does not support SatNOGS version of hamlib (dont ask), so some proxy required in order to translate rotctl commands to something PstRotator will understand.

My configuration consists of RasPi running SatNOGs and Windows PC running PstRotator. The cheap Channel Master rotator controlled by PstRotator over USB-UIRT module (again, does not matter what rotor you have as long as it is supported by PstRotator).

You will need to specify the ROT_MODEL_NETROTCTL in your SatNOGS setup, along with the port and IP of your Windows PC (the one which will be running proxy).

In your PstRotator you will need to enable Setup --> UDP control option.

Save the script below to a file and update if you have different port/IP of your Windows box (I mentioned already that this script is an ugly hack).

Works with python 2.7 (python 3 needs some work). To start, issue “python filename.py”
Feel free to share or improve, this is not supported code by any means.

#!/usr/bin/env python2.7
#
#   Andriy Rokhmanov - w9km@yahoo.com
#   Protocol Proxy between PstRotator and Hamlib rotctl
#   Based on:
#   Project Horus - Rotator Abstraction Layers
#
#   Copyright (C) 2018  Mark Jessop <vk5qi@rfhead.net>
#   Released under GNU GPL v3 or later
#
import socket
import time
import logging
from threading import Thread

class PSTRotator(object):
    """ PSTRotator communication class """

    # Local store of current azimuth/elevation
    current_azimuth = None
    current_elevation = None

    azel_thread_running = False
    service_thread_running = False
    last_poll_time = 0

    def __init__(self, hostname='localhost', port=12000, poll_rate=1, service_port=4533):
        """ Start a PSTRotator connection instance """
        self.hostname = hostname
        self.port = port
        self.poll_rate = poll_rate
        self.service_port = service_port
        self.azel_thread_running = True
        self.service_thread_running = True

        self.t_rx = Thread(target=self.azel_rx_loop)
        self.t_rx.start()

        self.t_poll = Thread(target=self.azel_poll_loop)
        self.t_poll.start()

        #self.t_service = Thread(target=self.service_loop)
        #self.t_service.start()

    def close(self):
        self.azel_thread_running = False
        self.service_thread_running = False

    def set_azel(self,azimuth,elevation):
        """ Send an Azimuth/Elevation move command to PSTRotator """

        # Sanity check inputs.
        if elevation > 90.0:
            elevation = 90.0
        elif elevation < 0.0:
            elevation = 0.0

        if azimuth > 360.0:
            azimuth = azimuth % 360.0

        # Generate command
        pst_command = "<PST><TRACK>0</TRACK><AZIMUTH>%.1f</AZIMUTH><ELEVATION>%.1f</ELEVATION></PST>" % (azimuth,elevation)
        logging.debug("Sent command: %s" % pst_command)
        # Send!
        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udp_socket.sendto(pst_command.encode('ascii'), (self.hostname,self.port))
        udp_socket.close()

        return True

    def poll_azel(self):
        """ Poll PSTRotator for an Azimuth/Elevation Update """
        az_poll_command = "<PST>AZ?</PST>"
        el_poll_command = "<PST>EL?</PST>"
        try:
            udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            udp_socket.sendto(az_poll_command.encode('ascii'), (self.hostname,self.port))
            time.sleep(0.2)
            udp_socket.sendto(el_poll_command.encode('ascii'), (self.hostname,self.port))
            udp_socket.close()
        except:
            pass
                    
    def service_loop(self):
        while self.service_thread_running:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('127.0.0.1', self.service_port))
                s.listen()
                conn, addr = s.accept()
                with conn:
                    print('Connected by', addr)
                    while True:
                        data = conn.recv(1024)
                        if not data:
                            break
                        logging.debug("Rotctl Sent: %s" % data)
                        conn.sendall(data)
            
    def azel_poll_loop(self):
        while self.azel_thread_running:
            self.poll_azel()
            #logging.debug("Poll sent to PSTRotator.")
            time.sleep(self.poll_rate)

    def azel_rx_loop(self):
        """ Listen for Azimuth and Elevation reports from PSTRotator"""
        s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        s.settimeout(1)
        s.bind(('',(self.port+1)))
        logging.debug("Started PST Rotator Listener Thread.")
        while self.azel_thread_running:
            try:
                m = s.recvfrom(512)
            except socket.timeout:
                m = None
            
            if m != None:
                # Attempt to parse Azimuth / Elevation
                #logging.debug("Received: %s" % m[0])
                data = m[0].decode('ascii')
                if data[:2] == 'EL':
                    if data[3:]:
                        #logging.debug("Empty EL string.")
                        pass
                    else:
                        self.current_elevation = float(data[3:])
                elif data[:2] == 'AZ':
                    if data[3:]:
                        #logging.debug("Empty AZ string.")
                        pass
                    else:
                        self.current_azimuth = float(data[3:])

        
        logging.debug("Closing UDP Listener")
        s.close()


    def get_azel(self):
        return (self.current_azimuth, self.current_elevation)

if __name__ == "__main__":
    logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)

    rot = PSTRotator(hostname="192.168.0.208", poll_rate=10)

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('192.168.0.208', 4533))
    sock.listen(1)
    while True:
        conn, addr = sock.accept()
        try:
            logging.debug("Connected by %s", addr)
            while True:
                data = conn.recv(1024)
                if not data:
                    break
                for line in data.splitlines():
                    #logging.debug("Received: %s", line)
                    if line.startswith('\dump_state'):                
                        logging.debug("dump_state: %s" % line)
                        dump_state = "10"+"\n"+"11"+"\n"+"0"+"\n"+"359"+"\n"+"0"+"\n"+"90"+"\n" 
                        conn.sendall(dump_state)
                    elif line.startswith('P'):
                        logging.debug("Set Position: %s" % line)
                        pos = line.split(' ')
                        rot.current_azimuth = pos[1]
                        rot.current_elevation = pos[2]
                        conn.sendall("RPRT 0\n")
                        rot.set_azel(float(pos[1]), float(pos[2]))
                        #rot.set_azel(120, 20)
                    elif line.startswith('p'):
                        logging.debug("Get Position: %s" % line)
                        if rot.current_azimuth == None:
                            rot.current_azimuth = "0"
                        if rot.current_elevation == None:
                            rot.current_elevation = "0"                        
                        current_pos = rot.current_azimuth + "\n" + rot.current_elevation
                        logging.debug("Reporting: %s", current_pos)
                        conn.sendall(current_pos + "\n")
                    else:
                        logging.debug("Other.. %s" % line)
        except:
            pass
        finally:
            conn.close()
4 Likes