Outils pour utilisateurs

Outils du site


kivy_oscpy

Kivy: oscpy

A modern implementation of OSC for python 3
C'est maintenu par les développeurs Kivy, c'est bien fait.

Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment.

Ressources

Installation

sudo pip3 install oscpy

Exemple tiré de

Extraits

Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un serveur pour recevoir et un client pour envoyer.

J'ai écrit quelque part que l'OSC ne supporte pas l'UTF-8. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec

 "à@éèù".encode('utf-8')

et de décoder avec

data.decode('utf-8')

main.py

main.py

class Accelerometer(BoxLayout):
 
    def __init__(self, app):
        super().__init__()
        self.app = app
        Clock.schedule_interval(self.update_display, 1/50)
        ...
 
    def update_display(self, dt):
        root = self.app.get_running_app()
        self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}"
 
    def on_activity(self, act):
        """Appelé par on_release: root.on_activity(3) dans accelerometer.kv"""
 
        r = self.app.get_running_app()
        r.client.send_message(b'/activity', [act])
 
class AccelerometerApp(App):
 
    def build(self):
        ...
        self.server = OSCThreadServer()
        self.server.listen( address=b'localhost',
                            port=3003,
                            default=True)
        ...  
        self.server.bind(b'/sensor', self.on_sensor)
        self.client = OSCClient(b'localhost', 3001)
 
    def on_sensor(self, sens):
        self.sensor = sens.decode('utf-8')

service.py

service.py

...
from oscpy.client import OSCClient
from oscpy.server import OSCThreadServer
...
class AccelerometerService:
    ...
    def init_osc(self):
        """Le serveur peut envoyer
        mais impossible d'avoir 2 serveurs sur le même port.
        """
        self.server = OSCThreadServer()
        self.server.listen('localhost', port=3001, default=True)
        # Les callbacks du serveur
        self.server.bind(b'/activity', self.on_activity)
        self.server.bind(b'/stop', self.on_stop)
        self.server.bind(b'/sensor_enable', self.on_sensor_enable)
        # Un simple client
        self.client = OSCClient(b'localhost', 3003)
    ...
    def on_activity(self, msg):
        print("activity", msg)
        self.activity = int(msg) 
    ...
    def get_acceleration(self):
        if self.status:
            a, b, c = 0,0,0
            ...
            if self.sensor_enabled != 0:
                # Set dans les arrays
                self.acc_x[self.num] = a
                self.acc_y[self.num] = b
                self.acc_z[self.num] = c
                self.acc_act[self.num] = self.activity
                acc_message = [a, b, c, self.activity, self.num]
                self.client.send_message(b'/acc', acc_message)
    ...   
    def run(self):
        while self.loop:
            self.get_acceleration()
            sleep(0.02)
 
if __name__ == '__main__':
    ACCELEROMETER = AccelerometerService()
    ACCELEROMETER.run()            

Bundle

Envoi d'un bundle avec un Client

osc.py
from oscpy.client import OSCClient
 
class OscClient:
 
    def __init__(self, **kwargs):
 
        self.ip = kwargs.get('ip', None)
        self.port = kwargs.get('port', None)
 
        self.client = OSCClient(self.ip, self.port)
 
    def send_depth(self, depth):
        self.client.send_message(b'/depth', [depth])
 
    def send_bundle(self, messages):
        bund = []
 
        for i, msg in enumerate(messages):
            tag = ('/' + str(i)).encode('utf-8')
            print(tag, msg)
            bund.append([tag, msg])
 
        self.client.send_bundle(bund)
 
 
if __name__ == "__main__":
 
    messages = [[3.2, 3, 4], [55.6, 12, 80]]
    cli = OscClient(**{'ip': '127.0.0.1', 'port': 8003})
    cli.send_bundle(messages)
b'/0' [3.2, 3, 4]
b'/1' [55.6, 12, 80]

Réception d'un bundle sur un Server

osc_server_test.py
from time import sleep
from oscpy.server import OSCThreadServer
 
dico = {}
def on_tag(*args):
    print(args)
    tag = int(args[0].decode('utf-8')[1:])
    print(tag)
    dico[tag] = args[1:]
    print(dico)
 
def default_handler(*args):
    print("default_handler", args)
 
server = OSCThreadServer()
server.listen(b'localhost', port=8003, default=True)
server.default_handler = default_handler
 
for i in range(10):
    tag = ('/' + str(i)).encode('utf-8')
    server.bind(tag, on_tag, get_address=True)
 
while 1:
    sleep(0.1)
python3 osc_server_test.py 
(b'/0', 3.200000047683716, 3, 4)
0
{0: (3.200000047683716, 3, 4)}
(b'/1', 55.599998474121094, 12, 80)
1
{0: (3.200000047683716, 3, 4), 1: (55.599998474121094, 12, 80)}
kivy_oscpy.txt · Dernière modification : 2021/08/05 07:30 de serge