from datetime import datetime import time, math # Import easymorph module. If the script is started by EasyMorph workflow, import # would automatically read parameters and the input dataset of the Call Python action. # Module does not have to be installed in this case, EasyMorph would make it available # for this script via PYTHONPATH envar. import easymorph as em # EasyMorph module can used directly from command line as well, which we would refer as 'standalone' mode # For standalone usage you would need to either add EasyMorph/Python/modules # to PYTHONPATH or install WHL:` pip install --force-reinstall --no-index --find-links="Python/modules" easymorph ` # (assuming current working dir is EasyMorph installation location) if em.standalone: print("Running in standalone mode; easymorph.input is not available") else: print( f"Running from workflow: {em.input.length=}, {em.input.width=}" ) # print rows & columns counts # Immutable input dataset is provided via easymorph.input. # Simple way to define default dataset for standalone (when em.input is None): src: em.Dataset = ( em.input or em.DatasetBuilder().add_column("count", [1, 2, 3]).to_dataset() ) def build_dataset(): # Build dataset with mixed types and both dict/positional rows ds = ( em.DatasetBuilder().add_rows( [ { "sensor": "S1", "reading": 0.5, "status": "ok", "timestamp": datetime(2025, 1, 1), }, { "sensor": "S2", "reading": None, # 'status' column value is missing, will insert None and report warning # "status": "ok", - uncommenting this will fix the warning "timestamp": None, }, ["S3", 19.8, "ok", datetime(2025, 1, 3)], # positional row ] ) # Columns can be inserted via add_column; values are filled starting from the first row, # rest padded to None. Note that there is a fast path for float arrays. .add_column("last_reported", [1.0, math.nan, 2.0]) ).to_dataset() # Dataset info print("Column names:", [c.name for c in ds.columns]) print("Row count:", ds.length) # Columns can be indexed: c0 = ds.columns[0] # by index cs = ds.columns["sensor"] # by name print("ds.columns[0].name:", c0.name, "ds.columns['sensor'].name:", cs.name) # First value from column 'status' print("ds['status'][0]:", ds.columns["status"][0]) # Rows are convertible to 'dict' print("Last row as dict:", ds.rows[-1].to_dict()) print("Last row [reading] value:", ds.rows[-1]["reading"]) # Read a few values from columns print("sensor[0:2]:", list(cs[0:2])) print("reading[all]:", list(ds.columns["reading"])) # Column view to numbers (non-numbers are converted to NaN). This is the most efficient way to get numeric data. print("Readings as array:", ds.columns["reading"].to_numbers_array()) # Slice rows first_two = ds.rows[:2] print("First two statuses:", [r["status"] for r in first_two]) em.yield_output(ds) # Simple passthrough demo. Note: this is lossy because numbers on the Python side are floats, not decimals def passthrough(): # yield_output is used to set output of the calling action and can be called only once # per script exectution. em.yield_output(None) is allowed to explicitly 'return' empty dataset em.yield_output( src # em.input can be None if the calling action is set to not pass it ) # You can get all workflow parameters by calling em.get_params_as_dict() def show_params(): params = em.get_params_as_dict() or {"placeholder":True} print("Workflow parameters:", params) # return params as dataset em.yield_output(em.DatasetBuilder().add_row(params)) # Shows how to read/write .dset file def file_operations(): ds = em.DatasetBuilder().add_column("count", [1, 2, 3]).to_dataset() # Save a dataset to file and read it back em.save_dataset(ds, "example.dset") em.yield_output( em.read_dataset( "example.dset" ) # returned dataset is immutable just as em.input ) # Shows how cancellation can be handled in long running script def cancellation(timeout=20.0): start_time = time.time() print("- waiting for cancellation (timeout", timeout, "s)") while not em.is_cancellation_requested(): if time.time() - start_time > timeout: raise SystemError("Example script timed out. Expected cancellation.") time.sleep(0.2) # At any point where it is logically appropriate you can check if running workflow # is requesting cancellation. In standalone mode `is_cancellation_requested` returns False. # Script is not required to raise an exception on cancellation: this is simply one way to handle it if em.is_cancellation_requested(): raise RuntimeError("Cancelled") # It is possible to report warnings back to EasyMorp workflow using em.warn function # Up to 128 warnings can be reported. def warnings(): em.warn("First warning") # Unequal column lengths result in a warning issued by DatasetBuilder em.yield_output( em.DatasetBuilder().add_column("id", [1, 2]).add_column("count", [1, 2, 3]).to_dataset() ) # warnings can be reported even after yield_output em.warn("Second warning", show_callsite=False) # this warning will not include call-site information # Note: In version 1.0.0 there is no dedicated from/to_df functions yet. # With glue code (fairly slow) there is a way to convert to/from DataFrame, if needed. def pandas_interop(): import pandas as pd def dataset_to_df(ds: em.Dataset) -> pd.DataFrame: return pd.DataFrame({col.name: list(col) for col in ds.columns}) def dataset_to_df_numeric(ds: em.Dataset) -> pd.DataFrame: # All columns as float, non-numeric -> NaN. # This is much faster option for numeric data return pd.DataFrame({col.name: col.to_numbers_array() for col in ds.columns}) def df_to_dataset(df: pd.DataFrame) -> em.Dataset: b = em.DatasetBuilder() for name, series in df.items(): b.add_column(name, series) return b.to_dataset() src_df = pd.DataFrame( { "C1": [1.0, float("nan"), 3.5], "C2": [3, None, "c"] } ) ds_from_df = df_to_dataset(src_df) df_from_ds = dataset_to_df(ds_from_df) out_ds = df_to_dataset(df_from_ds) em.yield_output(out_ds) def run_example(mode="standalone"): print(f"Running in mode '{mode}'") modes = { "build_dataset": build_dataset, "passthrough": passthrough, "cancellation": cancellation, "file_operations": file_operations, "warnings": warnings, "show_params": show_params, "pandas_interop": pandas_interop, } try: modes[mode]() except KeyError: raise KeyError(f"Unknown mode '{mode}'. Valid modes: {list(modes)}") # In the workflow, parameter 'mode' is expected to be set; otherwise use 'when_standalone' run_example(em.get_param("mode", when_standalone="show_params"))