/* * Dassault Aviation confidential * Copyright (c) 2019 Dassault Aviation. All rights reserved. * * This file is part of the protoframework project. * * NOTICE: All information contained herein is, and remains the property of Dassault Aviation. * The intellectual and technical concepts contained herein are proprietary to Dassault Aviation, * and are protected by trade secret or copyright law. * Unauthorized copying of this file, via any medium, is strictly prohibited. * It can not be copied and/or distributed, in source or binary form, without the express permission of * Dassault Aviation. */ package org.da.testcustom.python; import java.io.File; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.MalformedURLException; import java.net.SocketException; import java.net.URL; import java.net.UnknownHostException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringTokenizer; import java.util.TimerTask; import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.da.protoframework.conf.Configuration; import org.da.protoframework.conf.Logging; import org.da.protoframework.model.core.Application; import org.da.protoframework.model.core.Data; import org.da.protoframework.model.core.Datas; import org.da.protoframework.model.core.FrameworkException; import org.da.protoframework.model.core.Module; import org.da.protoframework.model.core.ResponseServiceInstance; import org.da.protoframework.model.core.ServiceInstance; import org.da.protoframework.model.def.CyclicInterfaceDefinition; import org.da.protoframework.model.types.ArrayType; import org.da.protoframework.model.types.EnumType; import org.da.protoframework.model.types.MapType; import org.da.protoframework.model.types.SimpleType; import org.da.protoframework.model.types.StructType; import org.da.protoframework.model.types.Type; import org.da.protoframework.scripts.ExceptionWindowConfiguration; import org.da.protoframework.scripts.ScriptExceptionWrapper; import org.da.protoframework.util.RuntimeState; import org.json.JSONArray; import org.json.JSONObject; import org.mdiutil.io.FileUtilities; import org.mdiutil.io.SocketUtilities; import org.mdiutil.io.StreamGobbler; /** * Represent a Custom Python Module. * * @since 0.6.12 */ public class CustomPythonModule extends Module { private static final int ARGS_COUNT = 17; private static final Pattern PYTHON_DIR = Pattern.compile("[A-Za-z_]+(\\d+)"); private static final Charset UTF8 = Charset.forName("UTF-8"); /** * The Python scripts expected version. */ public static final String VERSION = "1.2"; private String runtimePath = null; private String modulePath = null; private String moduleCallPath = null; private Process process = null; private Thread pythonThreadListener = null; private final List> inputServices = new ArrayList<>(); private ServiceInstance cyclicService = null; private CustomPythonModuleDefinition pythonDef = null; private File pythonScript = null; private String pythonStart = null; private String pythonINIT = null; private String pythonSend = "%send"; private String pythonSubscribe = "%subscribe"; private final List> outputServices = new ArrayList<>(); private Map> services = null; private ArrayBlockingQueue queue; private DatagramSocket receptionSocket = null; private DatagramSocket sendingSocket = null; private int sendingPort = -1; private int receivingPort = -1; private volatile InetAddress addrs = null; private volatile boolean connected = true; private Thread receptionThread = null; // the locks must not be static else the first Python module will be the only one executed private final Object LOCK = new Object(); private final Object LOCK2 = new Object(); private int waitAtStart = -1; private int waitAtInit = -1; private int wait = 10; private final String sep; private volatile boolean endedWithError = false; private volatile boolean isStarted = false; private volatile boolean isInitialized = false; private volatile boolean waitingElapsed = false; private FutureTask startTask = null; private FutureTask initTask = null; private boolean isGreaterThan27 = false; private boolean supportJSON = false; private final boolean isDeployed = Configuration.getInstance().isDeployed(); private final PythonExceptionManager exceptionManager = new PythonExceptionManager(); private static final String ERROR_ALREADY_BOUND = "Address already in use: Cannot bind"; /** * Constructor. * * @param appli the parent Application * @param name the Module name */ public CustomPythonModule(Application appli, String name) { super(appli, name); pythonDef = (CustomPythonModuleDefinition) def; getPythonRuntimePath(); this.modulePath = def.getPath(); sep = pythonDef.getSeparator(); } /** * Return the Module definition. * * @return the Module definition */ @Override public CustomPythonModuleDefinition getDefinition() { return pythonDef; } private void getPythonRuntimePath() { URL runtimeURL = (URL) pythonDef.getFactory().getFrameworkProperty("thePythonRuntime"); if (runtimeURL != null) { this.runtimePath = runtimeURL.getPath(); runtimePath = FileUtilities.replaceEscapedSequences(runtimePath); runtimePath = runtimePath.substring(1); checkPythonVersion(); } } private void checkPythonVersion() { File file = new File(runtimePath); String parentName = file.getParentFile().getName(); Matcher m = PYTHON_DIR.matcher(parentName); if (m.matches()) { int version = Integer.parseInt(m.group(1)); if (version >= 27) { isGreaterThan27 = true; } } } /** * Return the Module class. * * @return the Module class */ @Override protected Class getModuleClass() { return getClass(); } /** * Return the Module instance. * * @return the Module instance */ @Override public Object getModuleInstance() { return this; } private String getArgument(ServiceInstance service) { boolean isSubscriber = service.isSubscriber(); String entryPoint = pythonDef.getEntryPoint(service.getName()); if (entryPoint == null) { entryPoint = "-"; } StringBuilder buf = new StringBuilder(); buf.append(service.getName()).append("="); String isSubscriberS = isSubscriber ? "True" : "False"; buf.append(isSubscriberS).append(";").append(entryPoint).append(";"); Datas datas = service.getDatas(); Map> map = new TreeMap<>(datas.getDatas()); Iterator> it = map.values().iterator(); while (it.hasNext()) { Data data = it.next(); String stype = getType(data.getType()); buf.append(data.getName()).append(":").append(stype); if (it.hasNext()) { buf.append(";"); } } return buf.toString(); } private String getType(Type type) { if (type instanceof SimpleType) { SimpleType stype = (SimpleType) type; return getSimpleTypeName(stype); } else if (type instanceof ArrayType) { ArrayType atype = (ArrayType) type; Type eltType = atype.getType(); if (eltType instanceof SimpleType) { return getType(eltType) + "[]"; } else { return null; } } else if (type instanceof StructType) { StructType stype = (StructType) type; StringBuilder buf = new StringBuilder(); buf.append(type.getName()).append("{}"); Iterator it = stype.getFieldsList().iterator(); while (it.hasNext()) { StructType.Field field = it.next(); Type fieldType = field.getType(); if (fieldType instanceof SimpleType) { String name = getSimpleTypeName((SimpleType) fieldType); buf.append(".").append(field.getName()).append("=").append(name); } } return buf.toString(); } else { return null; } } private String getSimpleTypeName(SimpleType stype) { if (stype.getBaseType() == Type.BOOL) { return "bool"; } else if (stype.isIntegerType()) { return "int"; } else if (stype.isDecimalType()) { return "float"; } else if (stype.getBaseType() == Type.STRING) { return "string"; } else if (stype.getBaseType() == Type.CHAR) { return "char"; } else if (stype instanceof EnumType) { return "int"; } else { return null; } } /** * Called when the {@link org.da.protoframework.model.core.Module#init()} method of this Module is called. * * @param module the Module (remark: this is this Module) */ public void initMethod(Module module) { services = new TreeMap<>(getServices()); Iterator> it = services.values().iterator(); while (it.hasNext()) { ServiceInstance service = it.next(); if (service.isSubscriber()) { inputServices.add(service); } else { outputServices.add(service); if (service.getDefinition() instanceof CyclicInterfaceDefinition) { cyclicService = service; } } } initMethodImpl(); } private boolean createSockets() { try { this.receptionSocket = new DatagramSocket(receivingPort); this.sendingSocket = new DatagramSocket(); addrs = InetAddress.getByName("localhost"); return true; } catch (SocketException | UnknownHostException ex) { if (ex.getMessage().equals(ERROR_ALREADY_BOUND)) { Logging.error("Ended Python module " + this.toString() + ", Python script socket already bound"); } else { Logging.error(ex); } return false; } } /** * Escape the separator. The escaped string is a list of numbers (each corresponding to the Unicode value of the character in decimal) separated by * ";" characters. */ private String escapeChar() { StringBuilder buf = new StringBuilder(); boolean isFirst = true; for (int i = 0; i < sep.length(); i++) { char c = sep.charAt(i); if (!isFirst) { buf.append(";"); } else { isFirst = false; } buf.append(Integer.toString((int) c)); } return buf.toString(); } /** * Remove the PYTHONHOME environment property if it exists. * * @param pb the ProcessBuilder */ private void updatePythonHome(ProcessBuilder pb) { pb.environment().remove("PYTHONHOME"); } private void processEnded() { process = null; pythonThreadListener = null; if (startTask != null) { startTask.cancel(true); } if (initTask != null) { initTask.cancel(true); } Logging.error("Ended Python module " + this.toString() + ", Python script disconnected"); endMethod(); } private boolean checkPythonRuntime() { if (runtimePath != null) { File file = new File(runtimePath); exceptionManager.setRuntimeDirectory(file); RuntimeState state = PythonUtils.checkPythonRuntime(file); if (state == RuntimeState.ERROR_RUNTIME_NOT_FOUND) { Logging.error(this, "Python runtime not found"); return false; } else if (state == RuntimeState.ERROR_RUNTIME_NOT_COMPATIBLE) { Logging.error(this, "Python runtime not compatible with architecture"); return false; } return true; } else { return false; } } /** * Called for the Module initialization. */ private void initMethodImpl() { boolean isCompatible = checkPythonRuntime(); if (!isCompatible) { return; } sendingPort = pythonDef.getSendingPort(); if (sendingPort == -1) { sendingPort = SocketUtilities.getFreePort(); } receivingPort = pythonDef.getReceivingPort(); if (receivingPort == -1) { receivingPort = SocketUtilities.getFreePort(); } String sendingPortS = "" + sendingPort; String receivingPortS = "" + receivingPort; String sendingSize = "" + pythonDef.getSendingSize(); String receivingSize = "" + pythonDef.getReceivingSize(); // the process directory must be the parent file of the python script URL parentURL = pythonDef.getDirectory(); pythonScript = FileUtilities.getAbsoluteFile(new File(parentURL.getFile()), modulePath); File processDir = pythonScript.getParentFile(); try { parentURL = FileUtilities.getProtectedURL(processDir); } catch (MalformedURLException e) { } URL moduleURL = FileUtilities.getChildURL(parentURL, "pythonModule.py"); moduleCallPath = FileUtilities.replaceEscapedSequences(moduleURL.getPath()); moduleCallPath = moduleCallPath.substring(1); String[] args; // arguments = runtime path + module path + ports (4) + input service + output services int argsLength = ARGS_COUNT + inputServices.size() + outputServices.size(); args = new String[argsLength]; // first argument = Python executable args[0] = runtimePath; // argument 1 = Python file to execute args[1] = moduleCallPath; // argument 2 = python modules version args[2] = VERSION; // argument 3 = Python to Java port args[3] = sendingPortS; // argument 4 = Java to Python port args[4] = receivingPortS; // argument 5 = Python to Java socket size args[5] = sendingSize; // argument 6 = Java to Python socket size args[6] = receivingSize; URL appliURL = FileUtilities.getChildURL(parentURL, modulePath + ".py"); File appliFile = new File(appliURL.getFile()); String appliName = FileUtilities.getFileNameBody(appliFile); // get the name of the python class // we use this simple rule for the class name String appliPath = appliName.substring(0, 1).toUpperCase() + appliName.substring(1); // argument 7 = Python class location // // the argument is: : // - the name of the python module is the name of the python file without the .py extension // - the name of the python class if the class which will be used in this module args[7] = appliName + "." + appliPath; // argument 8 = cyclic service name if (cyclicService != null) { args[8] = cyclicService.getName(); } else { args[8] = "-"; } // argument 9 = init method if (pythonINIT != null) { args[9] = pythonINIT; } else { args[9] = "-"; } // argument 10 = start method if (pythonStart != null) { args[10] = pythonStart; } else { args[10] = "-"; } // argument 11 = send method if (pythonSend != null) { args[11] = pythonSend; } else { args[11] = "-"; } // argument 12 = subscribe method if (pythonSubscribe != null) { args[12] = pythonSubscribe; } else { args[12] = "-"; } // argument 13 = separator args[13] = escapeChar(); // argument 14 = Python warnings behavior args[14] = Boolean.toString(Configuration.getInstance().isIgnoringPythonWarnings()); // argument 15: number of input services args[15] = "" + inputServices.size(); // argument 16: number of output services args[16] = "" + outputServices.size(); boolean initOK = true; if (!inputServices.isEmpty()) { // argument 17 to end of input services characteristics for (int i = 0; i < inputServices.size(); i++) { String arg = getArgument(inputServices.get(i)); if (arg == null) { Logging.error("Null argument " + i + " when calling Python, abort the Python module initialization"); initOK = false; break; } args[ARGS_COUNT + i] = arg; } if (!initOK) { return; } } if (!outputServices.isEmpty()) { int argIndex = ARGS_COUNT + inputServices.size(); // argument to end of input services characteristics for (int i = 0; i < outputServices.size(); i++) { String arg = getArgument(outputServices.get(i)); if (arg == null) { Logging.error("Null argument " + i + " when calling Python, abort the Python module initialization"); initOK = false; break; } args[argIndex + i] = arg; } if (!initOK) { return; } } waitAtStart = getDefinition().getWaitAtStartMS(); waitAtInit = getDefinition().getWaitAtInitMS(); wait = getDefinition().getWaitMS(); queue = new ArrayBlockingQueue<>(getDefinition().getQueueSize()); ProcessBuilder pb = new ProcessBuilder(args); //pb.inheritIO(); pb.directory(processDir); // Remove the PYTHONHOME environment property if it exists updatePythonHome(pb); try { boolean isOK = createSockets(); if (!isOK) { return; } this.startSendThread(); // start the Python process process = pb.start(); redirectProcessIO(); pythonThreadListener = new Thread(new ProcessListener(process)); pythonThreadListener.start(); ExecutorService service = Executors.newSingleThreadExecutor(); TimerTask startDelayTask; if (waitAtStart > 0) { // wait for the Python start startDelayTask = new TimerTask() { @Override public void run() { waitingElapsed = true; if (startTask != null) { startTask.cancel(true); runtimeError("Could not start Python Script"); } } }; } else { startDelayTask = null; } startTask = new FutureTask<>(new Runnable() { @Override public void run() { try { while (connected && !isStarted) { synchronized (LOCK2) { byte[] receiveData = new byte[1024]; DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); receptionSocket.receive(receivePacket); String label = new String(receivePacket.getData(), "UTF-8").trim(); if (!isStarted && label.trim().equals("STARTED") || label.trim().equals("STARTED_JSON") || label.trim().equals("START_ERROR")) { isStarted = true; if (startDelayTask != null) { startDelayTask.cancel(); } if (label.trim().equals("START_ERROR")) { endedWithError = true; runtimeError("Unable to find the Initialization method"); } else if (label.trim().equals("STARTED_JSON")) { supportJSON = true; } break; } } } } catch (IOException ex) { Logging.error(ex); } } }, null); if (startDelayTask != null) { this.schedule(startDelayTask, waitAtStart); } service.execute(startTask); try { startTask.get(); } catch (InterruptedException | ExecutionException ex) { // do nothing, because this should only happen when the framework is shutdown } catch (CancellationException ex) { // do nothing, because this should only happen when we wait too much } if (pythonINIT != null && !endedWithError && !waitingElapsed) { TimerTask initDelayTask; // wait for the Python initialization if (waitAtInit > 0) { initDelayTask = new TimerTask() { @Override public void run() { waitingElapsed = true; if (initTask != null) { initTask.cancel(true); runtimeError("Could not initialize Python Script"); } } }; } else { initDelayTask = null; } initTask = new FutureTask<>(new Runnable() { @Override public void run() { try { while (connected && !isInitialized) { synchronized (LOCK2) { byte[] receiveData = new byte[1024]; DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); receptionSocket.receive(receivePacket); String label = new String(receivePacket.getData(), "UTF-8").trim(); if (!isInitialized && label.trim().equals("INIT") || label.trim().equals("INIT_ERROR")) { isInitialized = true; if (initDelayTask != null) { initDelayTask.cancel(); } if (label.trim().equals("INIT_ERROR")) { endedWithError = true; runtimeError("Unable to find the Initialization method"); } break; } } } } catch (IOException ex) { Logging.error(ex); } } }, null); if (initDelayTask != null) { this.schedule(initDelayTask, waitAtInit); } service.execute(initTask); queue.offer("INIT"); try { initTask.get(); } catch (InterruptedException | ExecutionException ex) { // do nothing, because this should only happen when the framework is shutdown } catch (CancellationException ex) { // do nothing, because this should only happen when we wait too much } } else { isInitialized = true; } if (isInitialized && !endedWithError && !waitingElapsed) { receptionThread = new Thread("receptionThread") { @Override public void run() { byte[] receiveData = new byte[1024]; DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); while (connected) { synchronized (LOCK2) { try { receptionSocket.receive(receivePacket); String label = new String(receivePacket.getData(), "UTF-8").trim(); if (label.equals("START_ERROR")) { connected = false; runtimeError("Disconnected Python Module, exception in start Script"); } else { ServiceInstance service = decodeReceivedLabel(label); service.invoke(); } } catch (IOException ex) { runtimeError("Reception or Sending problem"); } } } } }; receptionThread.start(); } else { connected = false; if (endedWithError) { Logging.error(this, "Disconnected Python Module, could not find Python method"); } else { Logging.error(this, "Disconnected Python Module, timed-out"); } if (process != null) { process.destroyForcibly(); } } } catch (IOException ex) { Logging.error(this, ex); } } private void write(String line, boolean isError) { if (isError) { if (!isDeployed) { System.err.println(line); } if (Configuration.getInstance().isDebuggingScripts()) { ExceptionWindowConfiguration exConf = ExceptionWindowConfiguration.getInstance(); if (!exConf.isExceptionWindowShown()) { boolean emit = exceptionManager.addLine(line); if (emit) { ScriptExceptionWrapper wrapper = exceptionManager.getExceptionWrapper(); exConf.showExceptionWindow(wrapper.getFile(), wrapper.getLineNumber(), wrapper); } } } if (line.startsWith("Traceback")) { Logging.error(this, line); } else { Logging.error(line); } } else { if (!isDeployed) { System.out.println(line); } Logging.info(this, line); } } private void redirectProcessIO() { StreamGobbler errorgobbler = new StreamGobbler(process.getErrorStream()); errorgobbler.setListener(new StreamGobbler.Listener() { @Override public void readLine(String string) { write(string, true); } @Override public void close() { } }); StreamGobbler outgobbler = new StreamGobbler(process.getInputStream()); outgobbler.setListener(new StreamGobbler.Listener() { @Override public void readLine(String string) { write(string, false); } @Override public void close() { } }); errorgobbler.start(); outgobbler.start(); } private void runtimeError(String message) { Logging.error(this, message); } private void startSendThread() { Thread sendThread = new Thread("t1") { @Override public void run() { try { while (connected) { synchronized (LOCK) { String str = queue.poll(); if (str != null) { byte[] bytes = str.getBytes(); DatagramPacket sendPacket = new DatagramPacket(bytes, bytes.length, addrs, sendingPort); if (sendingSocket != null) { sendingSocket.send(sendPacket); } Thread.sleep(wait); } } } queue.clear(); } catch (IOException | InterruptedException ex) { runtimeError("Reception or Sending problem"); } } }; sendThread.start(); } /** * Called when the {@link org.da.protoframework.model.core.Module#start()} method of this Module is called. */ public void startMethod() { if (pythonStart != null) { queue.offer("START"); } } private void decodeValues(ServiceInstance service, String content) { int offset = 0; while (true) { int indexVar = content.indexOf(":", offset); if (indexVar == -1) { break; } String varName = content.substring(offset, indexVar); if (content.charAt(indexVar + 1) == '{') { int endOfStruct = content.indexOf("}", offset); Data data = service.getData(varName); if (data == null) { Logging.error(this, "Incorrect Python message, Data " + varName + " does not exist for Service " + service.getName()); } else { Type type = data.getType(); if (!(type instanceof StructType)) { Logging.error(this, "Incorrect Python message, Data " + varName + " is not a Structure for Service " + service.getName()); } else { StructType structType = (StructType) type; String value = content.substring(indexVar + 2, endOfStruct - 1); decodeStructure(structType, (Data.Structure) data, value); } } offset = endOfStruct + 1; } else if (content.charAt(indexVar + 1) == '[') { int endOfArray = content.indexOf("]", offset); Data data = service.getData(varName); if (data == null) { Logging.error(this, "Incorrect Python message, Data " + varName + " does not exist for Service " + service.getName()); } else { Type type = data.getType(); if (!(type instanceof ArrayType)) { Logging.error(this, "Incorrect Python message, Data " + varName + " is not an Array for Service " + service.getName()); } else { ArrayType arrayType = (ArrayType) type; String value = content.substring(indexVar + 2, endOfArray - 1); decodeArray(arrayType, (Data.Array) data, value); } } offset = endOfArray + 1; } else { if (varName.startsWith(sep.substring(sep.length() - 1))) { varName = varName.substring(1); } int endOfVar = content.indexOf(sep, indexVar); String value; if (endOfVar == -1) { value = content.substring(indexVar + 1); } else { value = content.substring(indexVar + 1, endOfVar); } Data data = service.getData(varName); Type type = data.getType(); if (type instanceof SimpleType) { SimpleType stype = (SimpleType) type; if (stype.isIntegerType()) { try { int v = Integer.parseInt(value); data.setIntValue(v); } catch (NumberFormatException e) { Logging.error("Incorrect Python message, value " + value + " for data " + varName + " incorrect for Service " + service.getName()); } } else if (stype.isDecimalType()) { try { float f = Float.parseFloat(value); data.setFloatValue(f); } catch (NumberFormatException e) { Logging.error(this, "Incorrect Python message, value " + value + " for data " + varName + " incorrect for Service " + service.getName()); } } else if (stype.getBaseType() == Type.BOOL) { boolean b = value.equalsIgnoreCase("true"); data.setBooleanValue(b); } else if (stype.getBaseType() == Type.STRING) { String decodedString = decodeString(value); data.setStringValue(decodedString); } } else { Logging.error(this, "Incorrect Python message syntax: " + content); } if (endOfVar == -1) { break; } else { offset = endOfVar + 1; } } } } private void decodeArray(ArrayType arrayType, Data.Array adata, String value) { Type type = arrayType.getType(); if (type instanceof SimpleType) { int index = 0; SimpleType stype = (SimpleType) type; StringTokenizer tok = new StringTokenizer(value, sep); while (tok.hasMoreTokens()) { String tk = tok.nextToken(); if (stype.isIntegerType()) { try { int v = Integer.parseInt(tk); adata.setIntValue(v, index); } catch (NumberFormatException e) { Logging.error(this, "Incorrect Python message, value " + value + " incorrect for Array " + adata.getName()); } } else if (stype.isDecimalType()) { try { float f = Float.parseFloat(tk); adata.setFloatValue(f, index); } catch (NumberFormatException e) { Logging.error(this, "Incorrect Python message, value " + value + " incorrect for Array " + adata.getName()); } } else if (stype.getBaseType() == Type.BOOL) { boolean b = tk.equalsIgnoreCase("true"); adata.setBooleanValue(b, index); } else if (stype.getBaseType() == Type.STRING) { String decodedString = decodeString(tk); adata.setStringValue(decodedString, index); } index++; } } } private void decodeStructure(StructType structType, Data.Structure sdata, String value) { StringTokenizer tok = new StringTokenizer(value, sep); while (tok.hasMoreTokens()) { String tk = tok.nextToken(); int fieldIndex = tk.indexOf("="); String fieldName = tk.substring(0, fieldIndex); String fieldValue = tk.substring(fieldIndex + 1); if (structType.hasField(fieldName)) { Type fieldType = structType.getField(fieldName).getType(); if (fieldType instanceof SimpleType) { SimpleType stype = (SimpleType) fieldType; if (stype.isIntegerType()) { try { int v = Integer.parseInt(fieldValue); sdata.setFieldIntValue(fieldName, v); } catch (NumberFormatException e) { Logging.error(this, "Incorrect Python message, value " + value + " for field " + fieldName + " incorrect for Structure " + sdata.getName()); } } else if (stype.isDecimalType()) { try { float f = Float.parseFloat(fieldValue); sdata.setFieldFloatValue(fieldName, f); } catch (NumberFormatException e) { Logging.error(this, "Incorrect Python message, value " + value + " for field " + fieldName + " incorrect for Structure " + sdata.getName()); } } else if (stype.getBaseType() == Type.BOOL) { boolean b = fieldValue.equalsIgnoreCase("true"); sdata.setFieldBooleanValue(fieldName, b); } else if (stype.getBaseType() == Type.STRING) { String decodedString = decodeString(fieldValue); sdata.setFieldValue(fieldName, decodedString); } } } } } private String decodeString(String value) { if (isGreaterThan27) { byte[] b = HexaDecoder.decode(value); String str = new String(b, UTF8); return str; } else { return value; } } private String encodeString(String value) { if (isGreaterThan27) { byte[] b = value.getBytes(UTF8); String str = HexaDecoder.encode(b); return str; } else { return value; } } private ServiceInstance decodeReceivedLabel(String label) { if (supportJSON) { return decodeReceivedLabelJSON(label); } else { return decodeReceivedLabelRaw(label); } } private ServiceInstance decodeReceivedLabelJSON(String label) { JSONObject serviceJSON = new JSONObject(label); String serviceName = serviceJSON.getString("service"); ServiceInstance service = services.get(serviceName); JSONObject datasJSON = serviceJSON.getJSONObject("datas"); Iterator it = datasJSON.keys(); while (it.hasNext()) { String dataName = it.next(); Data data = service.getData(dataName); Type type = data.getType(); if (type instanceof StructType) { StructType struct = (StructType) type; JSONObject listJSON = datasJSON.getJSONObject(dataName); Object o = getValue(struct, listJSON); data.setValue(o); } else if (type instanceof ArrayType) { ArrayType array = (ArrayType) type; JSONArray listJSON = datasJSON.getJSONArray(dataName); Object o = getValue(array, listJSON); data.setValue(o); } else if (type instanceof MapType) { MapType map = (MapType) type; JSONObject mapJSON = datasJSON.getJSONObject(dataName); Object o = getValue(map, mapJSON); data.setValue(o); } else { data.setValue(datasJSON.get(dataName)); } } return service; } private Object getValue(Type type, JSONArray jsonArray) { if (type instanceof ArrayType) { ArrayType array = (ArrayType) type; Type eltType = array.getType(); List list = new ArrayList<>(); for (int i = 0; i < jsonArray.length(); i++) { Object o; if (eltType instanceof ArrayType) { JSONArray json = jsonArray.getJSONArray(i); o = getValue(eltType, json); } else if (eltType instanceof StructType) { JSONObject json = jsonArray.getJSONObject(i); o = getValue(eltType, json); } else { o = jsonArray.get(i); } list.add(o); } return list; } else { return null; } } private Object getValue(Type type, JSONObject json) { if (type instanceof StructType) { StructType struct = (StructType) type; List value = struct.getDefaultValue(); for (int i = 0; i < struct.countFields(); i++) { StructType.Field field = struct.getField(i); Object fieldValue; Type fieldType = field.getType(); if (fieldType instanceof StructType) { JSONObject jsonField = json.getJSONObject(field.getName()); fieldValue = getValue(fieldType, jsonField); } else if (fieldType instanceof ArrayType) { JSONArray jsonField = json.getJSONArray(field.getName()); fieldValue = getValue(fieldType, jsonField); } else { fieldValue = json.get(field.getName()); } struct.setFieldValue(field.getName(), value, fieldValue); } return value; } else if (type instanceof MapType) { MapType map = (MapType) type; Type keyType = map.getKeyType(); Type valueType = map.getType(); Map myMap = new HashMap<>(); Iterator it = json.keys(); while (it.hasNext()) { String key = it.next(); Object value; if (valueType instanceof StructType) { JSONObject jsonValue = json.getJSONObject(key); value = getValue(valueType, jsonValue); } else if (valueType instanceof ArrayType) { JSONArray jsonValue = json.getJSONArray(key); value = getValue(valueType, jsonValue); } else { value = filterNumericValue(json.get(key), valueType); } Object keyValue = filterMapKeyValue(key, keyType); myMap.put(keyValue, value); } return myMap; } else { return null; } } private Object filterMapKeyValue(String value, Type type) { if (type instanceof SimpleType) { SimpleType stype = (SimpleType) type; if (stype.isNumericType()) { short baseType = stype.getBaseType(); try { if (baseType == Type.DOUBLE) { return Double.parseDouble(value); } else if (baseType == Type.FLOAT) { return Float.parseFloat(value); } else if (baseType == Type.LONG) { return Long.parseLong(value); } else if (baseType == Type.INT) { return Integer.parseInt(value); } else if (baseType == Type.SHORT) { return Short.parseShort(value); } else if (baseType == Type.BYTE) { return Byte.parseByte(value); } else { return value; } } catch (NumberFormatException e) { return 0; } } else { return value; } } else { return value; } } private Object filterNumericValue(Object value, Type type) { if (value instanceof Number && type instanceof SimpleType) { Number number = (Number) value; SimpleType stype = (SimpleType) type; if (stype.isNumericType()) { short baseType = stype.getBaseType(); if (baseType == Type.DOUBLE) { return number.doubleValue(); } else if (baseType == Type.FLOAT) { return number.floatValue(); } else if (baseType == Type.LONG) { return number.longValue(); } else if (baseType == Type.INT) { return number.intValue(); } else if (baseType == Type.SHORT) { return number.shortValue(); } else if (baseType == Type.BYTE) { return number.byteValue(); } else { return value; } } else { return value; } } else { return value; } } private ServiceInstance decodeReceivedLabelRaw(String label) { int idx = label.indexOf('='); if (idx != -1 && label.length() > idx + 1) { String serviceName = label.substring(0, idx).trim(); String content = label.substring(idx + 1); if (services.containsKey(serviceName)) { ServiceInstance service = services.get(serviceName); if (!service.isProvidingData() || service instanceof ResponseServiceInstance) { Logging.error(this, "Incorrect Python message, " + serviceName + " not supported"); } else { decodeValues(service, content); return service; } } else { Logging.error(this, "Incorrect Python message, " + serviceName + " does not exist"); } } else { Logging.error(this, "Incorrect Python message syntax: " + label); } return null; } /** * Called when the Module is notified from a Service. * * @param service the ServiceInstance */ public void subscribeMethod(ServiceInstance service) { if (sendingSocket != null) { if (supportJSON) { subscribeMethodJSON(service); } else { subscribeMethodRaw(service); } } } /** * Called when the Module is notified from a Service. The content is send to Python in JSON format. * * @param service the ServiceInstance */ private void subscribeMethodJSON(ServiceInstance service) { JSONObject json = new JSONObject(); JSONObject jsonDatas = new JSONObject(); Datas datas = service.getDatas(); Iterator> it = datas.getDatas().values().iterator(); while (it.hasNext()) { Data data = it.next(); if (data.hasChanged()) { Object value = getValue(data); jsonDatas.put(data.getName(), value); } } json.put("service", service.getName()); json.put("datas", jsonDatas); String content = json.toString(); queue.offer(content); } private Object getValue(Data data) { return getValue(data.getType(), data.getValue()); } private Object getValue(Type type, Object value) { if (type instanceof StructType) { StructType struct = (StructType) type; Map dict = new HashMap<>(); for (int i = 0; i < struct.countFields(); i++) { StructType.Field field = struct.getField(i); Object fieldValue = struct.getFieldValue(field.getName(), value); Type fieldType = field.getType(); fieldValue = getValue(fieldType, fieldValue); dict.put(field.getName(), fieldValue); } return dict; } else if (type instanceof MapType) { MapType map = (MapType) type; Type valueType = map.getType(); Map value0 = (Map) value; Map dict = new HashMap<>(); Iterator it = value0.keySet().iterator(); while (it.hasNext()) { Object key0 = it.next(); String key1 = key0.toString(); Object eltValue = value0.get(key0); eltValue = getValue(valueType, eltValue); dict.put(key1, eltValue); } return dict; } else if (type instanceof ArrayType) { ArrayType array = (ArrayType) type; List list = new ArrayList<>(); List _in = (List) value; for (int i = 0; i < _in.size(); i++) { Object o = _in.get(i); o = getValue(array.getType(), o); list.add(o); } return list; } else { return value; } } /** * Called when the Module is notified from a Service. The content is send to Python in raw format. * * @param service the ServiceInstance */ private void subscribeMethodRaw(ServiceInstance service) { StringBuilder buf = new StringBuilder(); buf.append(service.getName()).append("="); Datas datas = service.getDatas(); boolean addSep = false; Iterator> it = datas.getDatas().values().iterator(); while (it.hasNext()) { Data data = it.next(); if (data.hasChanged()) { if (addSep) { buf.append(sep); } String strValue = encodeValue(data); buf.append(data.getName()).append(":").append(strValue); addSep = true; } } queue.offer(buf.toString()); } /** * Called when the Module is notified for invoking a Service. * * @param service the ServiceInstance */ public void sendMethod(ServiceInstance service) { if (sendingSocket != null) { StringBuilder buf = new StringBuilder(); buf.append("send[").append(service.getName()).append("]"); queue.offer(buf.toString()); } } private String encodeValue(Data data) { Type type = data.getType(); if (type instanceof SimpleType) { SimpleType sType = (SimpleType) type; if (sType.isIntegerType()) { return data.getValueAsString(); } else if (sType.isDecimalType()) { return data.getValueAsString(); } else if (sType.getBaseType() == Type.CHAR) { return Integer.toString(data.getValueAsInt()); } else if (sType.getBaseType() == Type.STRING) { String encodedString = encodeString(data.getValueAsString()); return encodedString; } else if (sType.getBaseType() == Type.BOOL) { boolean b = data.getValueAsBoolean(); return b ? "True" : "False"; } else { return "0"; } } else if (type instanceof StructType) { StructType struct = (StructType) type; StringBuilder buf = new StringBuilder(); buf.append("{"); Data.Structure dataStruct = (Data.Structure) data; Iterator it = struct.getFieldsList().iterator(); while (it.hasNext()) { StructType.Field field = it.next(); Type fieldType = field.getType(); String valueAsString = "0"; if (fieldType instanceof SimpleType) { SimpleType sType = (SimpleType) fieldType; if (sType.isIntegerType()) { valueAsString = dataStruct.getFieldValueAsString(field.getName()); } else if (sType.isDecimalType()) { valueAsString = dataStruct.getFieldValueAsString(field.getName()); } else if (sType.getBaseType() == Type.CHAR) { valueAsString = dataStruct.getFieldValueAsString(field.getName()); } else if (sType.getBaseType() == Type.STRING) { valueAsString = dataStruct.getFieldValueAsString(field.getName()); valueAsString = encodeString(valueAsString); } else if (sType.getBaseType() == Type.BOOL) { boolean b = dataStruct.getFieldValueAsBoolean(field.getName()); valueAsString = b ? "True" : "False"; } } buf.append(field.getName()).append("=").append(valueAsString).append(sep); } buf.append("}"); return buf.toString(); } else if (type instanceof ArrayType) { ArrayType array = (ArrayType) type; Type eltType = array.getType(); if (eltType instanceof SimpleType) { SimpleType sType = (SimpleType) eltType; Data.Array dataArray = (Data.Array) data; StringBuilder buf = new StringBuilder(); buf.append("["); for (int i = 0; i < dataArray.countElements(); i++) { String valueAsString = "0"; if (sType.isIntegerType()) { valueAsString = dataArray.getValueAsString(i); } else if (sType.isDecimalType()) { valueAsString = dataArray.getValueAsString(i); } else if (sType.getBaseType() == Type.CHAR) { valueAsString = dataArray.getValueAsString(i); } else if (sType.getBaseType() == Type.STRING) { valueAsString = dataArray.getValueAsString(i); valueAsString = encodeString(valueAsString); } else if (sType.getBaseType() == Type.BOOL) { boolean b = dataArray.getValueAsBoolean(i); valueAsString = b ? "True" : "False"; } buf.append(valueAsString).append(sep); } buf.append("]"); return buf.toString(); } else { return "0"; } } else { return "0"; } } /** * Called when the {@link org.da.protoframework.model.core.Module#shutdown()} method of this Module is called. */ public void endMethod() { connected = false; if (process != null) { process.destroyForcibly(); } } /** * Create the Module instance. * * @throws FrameworkException */ @Override public void create() throws FrameworkException { try { if (appli.isReified()) { String path = def.getPath(); if (path == null) { throw new FrameworkException("Impossible to create module, no path found"); } initMethod = getInitPreConfMethod("initMethod"); if (def.getInitEntryPoint() != null) { pythonINIT = def.getInitEntryPoint(); } startMethod = getStartEndMethod("startMethod"); if (def.getStartEntryPoint() != null) { pythonStart = def.getStartEntryPoint(); } startDelay = def.getStartEntryPointDelay(); if (def.getSendEntryPoint() != null) { pythonSend = def.getSendEntryPoint(); } if (def.getReceiveEntryPoint() != null) { pythonSubscribe = def.getReceiveEntryPoint(); } receiveMethod = getMethod("subscribeMethod"); sendMethod = getMethod("sendMethod"); endMethod = getStartEndMethod("endMethod"); } setupServices(); } catch (NoSuchMethodException ex) { throw new FrameworkException("Impossible to create module", ex); } } /** * Listen to the framework process. */ public class ProcessListener implements Runnable { private final Process _process; private ProcessListener(Process process) { this._process = process; } @Override public void run() { try { _process.waitFor(); processEnded(); } catch (InterruptedException ex) { processEnded(); } } } }