#####################################################################
# #
# /__init__.py #
# #
# Copyright 2013, Monash University #
# #
# This file is part of the program runmanager, in the labscript #
# suite (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
import itertools
import os
import sys
import random
import time
import subprocess
import types
import threading
import traceback
import datetime
import errno
import json
import tokenize
import io
import labscript_utils.h5_lock
import h5py
import numpy as np
from labscript_utils.ls_zprocess import ProcessTree, zmq_push_multipart
from labscript_utils.labconfig import LabConfig
process_tree = ProcessTree.instance()
from .__version__ import __version__
def _ensure_str(s):
"""convert bytestrings and numpy strings to python strings"""
return s.decode() if isinstance(s, bytes) else str(s)
def is_valid_python_identifier(name):
# No whitespace allowed. Do this check here because an actual newline in the source
# is not easily distinguished from a NEWLINE token in the produced tokens, which is
# produced even when there is no newline character in the string. So since we ignore
# NEWLINE later, we must check for it now.
if name != "".join(name.split()):
return False
try:
tokens = list(tokenize.generate_tokens(io.StringIO(name).readline))
except tokenize.TokenError:
return False
token_types = [
t[0] for t in tokens if t[0] not in [tokenize.NEWLINE, tokenize.ENDMARKER]
]
if len(token_types) == 1:
return token_types[0] == tokenize.NAME
return False
[docs]class ExpansionError(Exception):
"""An exception class so that error handling code can tell when a
parsing exception was caused by a mismatch with the expansion mode"""
pass
[docs]class TraceDictionary(dict):
def __init__(self, *args, **kwargs):
self.trace_data = None
dict.__init__(self, *args, **kwargs)
def start_trace(self):
self.trace_data = []
def __getitem__(self, key):
if self.trace_data is not None:
if key not in self.trace_data:
self.trace_data.append(key)
return dict.__getitem__(self, key)
def stop_trace(self):
trace_data = self.trace_data
self.trace_data = None
return trace_data
def new_globals_file(filename):
with h5py.File(filename, 'w') as f:
f.create_group('globals')
[docs]def add_expansion_groups(filename):
"""backward compatability, for globals files which don't have
expansion groups. Create them if they don't exist. Guess expansion
settings based on datatypes, if possible."""
# DEPRECATED
# Don't open in write mode unless we have to:
with h5py.File(filename, 'r') as f:
requires_expansion_group = []
for groupname in f['globals']:
group = f['globals'][groupname]
if not 'expansion' in group:
requires_expansion_group.append(groupname)
if requires_expansion_group:
group_globalslists = [get_globalslist(filename, groupname) for groupname in requires_expansion_group]
with h5py.File(filename, 'a') as f:
for groupname, globalslist in zip(requires_expansion_group, group_globalslists):
group = f['globals'][groupname]
subgroup = group.create_group('expansion')
# Initialise all expansion settings to blank strings:
for name in globalslist:
subgroup.attrs[name] = ''
groups = {group_name: filename for group_name in get_grouplist(filename)}
sequence_globals = get_globals(groups)
evaled_globals, global_hierarchy, expansions = evaluate_globals(sequence_globals, raise_exceptions=False)
for group_name in evaled_globals:
for global_name in evaled_globals[group_name]:
value = evaled_globals[group_name][global_name]
expansion = guess_expansion_type(value)
set_expansion(filename, group_name, global_name, expansion)
def get_grouplist(filename):
# For backward compatability, add 'expansion' settings to this
# globals file, if it doesn't contain any. Guess expansion settings
# if possible.
# DEPRECATED
add_expansion_groups(filename)
with h5py.File(filename, 'r') as f:
grouplist = f['globals']
# File closes after this function call, so have to
# convert the grouplist generator to a list of strings
# before its file gets dereferenced:
return list(grouplist)
def new_group(filename, groupname):
with h5py.File(filename, 'a') as f:
if groupname in f['globals']:
raise Exception('Can\'t create group: target name already exists.')
group = f['globals'].create_group(groupname)
group.create_group('units')
group.create_group('expansion')
[docs]def copy_group(source_globals_file, source_groupname, dest_globals_file, delete_source_group=False):
""" This function copies the group source_groupname from source_globals_file
to dest_globals_file and renames the new group so that there is no name
collision. If delete_source_group is False the copyied files have
a suffix '_copy'."""
with h5py.File(source_globals_file, 'a') as source_f:
# check if group exists
if source_groupname not in source_f['globals']:
raise Exception('Can\'t copy there is no group "{}"!'.format(source_groupname))
# Are we coping from one file to another?
if dest_globals_file is not None and source_globals_file != dest_globals_file:
dest_f = h5py.File(dest_globals_file, 'a') # yes -> open dest_globals_file
else:
dest_f = source_f # no -> dest files is source file
# rename Group until there is no name collisions
i = 0 if not delete_source_group else 1
dest_groupname = source_groupname
while dest_groupname in dest_f['globals']:
dest_groupname = "{}({})".format(dest_groupname, i) if i > 0 else "{}_copy".format(dest_groupname)
i += 1
# copy group
dest_f.copy(source_f['globals'][source_groupname], '/globals/%s' % dest_groupname)
# close opend file
if dest_f != source_f:
dest_f.close()
return dest_groupname
def rename_group(filename, oldgroupname, newgroupname):
if oldgroupname == newgroupname:
# No rename!
return
with h5py.File(filename, 'a') as f:
if newgroupname in f['globals']:
raise Exception('Can\'t rename group: target name already exists.')
f.copy(f['globals'][oldgroupname], '/globals/%s' % newgroupname)
del f['globals'][oldgroupname]
def delete_group(filename, groupname):
with h5py.File(filename, 'a') as f:
del f['globals'][groupname]
def get_globalslist(filename, groupname):
with h5py.File(filename, 'r') as f:
group = f['globals'][groupname]
# File closes after this function call, so have to convert
# the attrs to a dict before its file gets dereferenced:
return dict(group.attrs)
def new_global(filename, groupname, globalname):
if not is_valid_python_identifier(globalname):
raise ValueError('%s is not a valid Python variable name'%globalname)
with h5py.File(filename, 'a') as f:
group = f['globals'][groupname]
if globalname in group.attrs:
raise Exception('Can\'t create global: target name already exists.')
group.attrs[globalname] = ''
f['globals'][groupname]['units'].attrs[globalname] = ''
f['globals'][groupname]['expansion'].attrs[globalname] = ''
def rename_global(filename, groupname, oldglobalname, newglobalname):
if oldglobalname == newglobalname:
# No rename!
return
if not is_valid_python_identifier(newglobalname):
raise ValueError('%s is not a valid Python variable name'%newglobalname)
value = get_value(filename, groupname, oldglobalname)
units = get_units(filename, groupname, oldglobalname)
expansion = get_expansion(filename, groupname, oldglobalname)
with h5py.File(filename, 'a') as f:
group = f['globals'][groupname]
if newglobalname in group.attrs:
raise Exception('Can\'t rename global: target name already exists.')
group.attrs[newglobalname] = value
group['units'].attrs[newglobalname] = units
group['expansion'].attrs[newglobalname] = expansion
del group.attrs[oldglobalname]
del group['units'].attrs[oldglobalname]
del group['expansion'].attrs[oldglobalname]
def get_value(filename, groupname, globalname):
with h5py.File(filename, 'r') as f:
value = f['globals'][groupname].attrs[globalname]
# Replace numpy strings with python unicode strings.
# DEPRECATED, for backward compat with old files
value = _ensure_str(value)
return value
def set_value(filename, groupname, globalname, value):
with h5py.File(filename, 'a') as f:
f['globals'][groupname].attrs[globalname] = value
def get_units(filename, groupname, globalname):
with h5py.File(filename, 'r') as f:
value = f['globals'][groupname]['units'].attrs[globalname]
# Replace numpy strings with python unicode strings.
# DEPRECATED, for backward compat with old files
value = _ensure_str(value)
return value
def set_units(filename, groupname, globalname, units):
with h5py.File(filename, 'a') as f:
f['globals'][groupname]['units'].attrs[globalname] = units
def get_expansion(filename, groupname, globalname):
with h5py.File(filename, 'r') as f:
value = f['globals'][groupname]['expansion'].attrs[globalname]
# Replace numpy strings with python unicode strings.
# DEPRECATED, for backward compat with old files
value = _ensure_str(value)
return value
def set_expansion(filename, groupname, globalname, expansion):
with h5py.File(filename, 'a') as f:
f['globals'][groupname]['expansion'].attrs[globalname] = expansion
def delete_global(filename, groupname, globalname):
with h5py.File(filename, 'a') as f:
group = f['globals'][groupname]
del group.attrs[globalname]
def guess_expansion_type(value):
if isinstance(value, np.ndarray) or isinstance(value, list):
return u'outer'
else:
return u''
def iterator_to_tuple(iterator, max_length=1000000):
# We want to prevent infinite length tuples, but we cannot know
# whether they are infinite or not in advance. So we'll convert to
# a tuple only if the length is less than max_length:
temp_list = []
for i, element in enumerate(iterator):
temp_list.append(element)
if i == max_length:
raise ValueError('This iterator is very long, possibly infinite. ' +
'Runmanager cannot create an infinite number of shots. ' +
'If you really want an iterator longer than %d, ' % max_length +
'please modify runmanager.iterator_to_tuple and increase max_length.')
return tuple(temp_list)
[docs]def get_all_groups(h5_files):
"""returns a dictionary of group_name: h5_path pairs from a list of h5_files."""
if isinstance(h5_files, bytes) or isinstance(h5_files, str):
h5_files = [h5_files]
groups = {}
for path in h5_files:
for group_name in get_grouplist(path):
if group_name in groups:
raise ValueError('Error: group %s is defined in both %s and %s. ' % (group_name, groups[group_name], path) +
'Only uniquely named groups can be used together '
'to make a run file.')
groups[group_name] = path
return groups
[docs]def get_globals(groups):
"""Takes a dictionary of group_name: h5_file pairs and pulls the
globals out of the groups in their files. The globals are strings
storing python expressions at this point. All these globals are
packed into a new dictionary, keyed by group_name, where the values
are dictionaries which look like {global_name: (expression, units, expansion), ...}"""
# get a list of filepaths:
filepaths = set(groups.values())
sequence_globals = {}
for filepath in filepaths:
groups_from_this_file = [g for g, f in groups.items() if f == filepath]
with h5py.File(filepath, 'r') as f:
for group_name in groups_from_this_file:
sequence_globals[group_name] = {}
globals_group = f['globals'][group_name]
values = dict(globals_group.attrs)
units = dict(globals_group['units'].attrs)
expansions = dict(globals_group['expansion'].attrs)
for global_name, value in values.items():
unit = units[global_name]
expansion = expansions[global_name]
# Replace numpy strings with python unicode strings.
# DEPRECATED, for backward compat with old files
value = _ensure_str(value)
unit = _ensure_str(unit)
expansion = _ensure_str(expansion)
sequence_globals[group_name][global_name] = value, unit, expansion
return sequence_globals
[docs]def evaluate_globals(sequence_globals, raise_exceptions=True):
"""Takes a dictionary of globals as returned by get_globals. These
globals are unevaluated strings. Evaluates them all in the same
namespace so that the expressions can refer to each other. Iterates
to allow for NameErrors to be resolved by subsequently defined
globals. Throws an exception if this does not result in all errors
going away. The exception contains the messages of all exceptions
which failed to be resolved. If raise_exceptions is False, any
evaluations resulting in an exception will instead return the
exception object in the results dictionary"""
# Flatten all the groups into one dictionary of {global_name:
# expression} pairs. Also create the group structure of the results
# dict, which has the same structure as sequence_globals:
all_globals = {}
results = {}
expansions = {}
global_hierarchy = {}
# Pre-fill the results dictionary with groups, this is needed for
# storing exceptions in the case of globals with the same name being
# defined in multiple groups (all of them get the exception):
for group_name in sequence_globals:
results[group_name] = {}
multiply_defined_globals = set()
for group_name in sequence_globals:
for global_name in sequence_globals[group_name]:
if global_name in all_globals:
# The same global is defined twice. Either raise an
# exception, or store the exception for each place it is
# defined, depending on whether raise_exceptions is True:
groups_with_same_global = []
for other_group_name in sequence_globals:
if global_name in sequence_globals[other_group_name]:
groups_with_same_global.append(other_group_name)
exception = ValueError('Global named \'%s\' is defined in multiple active groups:\n ' % global_name +
'\n '.join(groups_with_same_global))
if raise_exceptions:
raise exception
for other_group_name in groups_with_same_global:
results[other_group_name][global_name] = exception
multiply_defined_globals.add(global_name)
all_globals[global_name], units, expansion = sequence_globals[group_name][global_name]
expansions[global_name] = expansion
# Do not attempt to evaluate globals which are multiply defined:
for global_name in multiply_defined_globals:
del all_globals[global_name]
# Eval the expressions in the same namespace as each other:
evaled_globals = {}
# we use a "TraceDictionary" to track which globals another global depends on
sandbox = TraceDictionary()
exec('from pylab import *', sandbox, sandbox)
exec('from runmanager.functions import *', sandbox, sandbox)
globals_to_eval = all_globals.copy()
previous_errors = -1
while globals_to_eval:
errors = []
for global_name, expression in globals_to_eval.copy().items():
# start the trace to determine which globals this global depends on
sandbox.start_trace()
try:
code = compile(expression, '<string>', 'eval')
value = eval(code, sandbox)
# Need to know the length of any generators, convert to tuple:
if isinstance(value, types.GeneratorType):
value = iterator_to_tuple(value)
# Make sure if we're zipping or outer-producting this value, that it can
# be iterated over:
if expansions[global_name] == 'outer':
try:
iter(value)
except Exception as e:
raise ExpansionError(str(e))
except Exception as e:
# Don't raise, just append the error to a list, we'll display them all later.
errors.append((global_name, e))
sandbox.stop_trace()
continue
# Put the global into the namespace so other globals can use it:
sandbox[global_name] = value
del globals_to_eval[global_name]
evaled_globals[global_name] = value
# get the results from the global trace
trace_data = sandbox.stop_trace()
# Only store names of globals (not other functions)
for key in list(trace_data): # copy the list before iterating over it
if key not in all_globals:
trace_data.remove(key)
if trace_data:
global_hierarchy[global_name] = trace_data
if len(errors) == previous_errors:
# Since some globals may refer to others, we expect maybe
# some NameErrors to have occured. There should be fewer
# NameErrors each iteration of this while loop, as globals
# that are required become defined. If there are not fewer
# errors, then there is something else wrong and we should
# raise it.
if raise_exceptions:
message = 'Error parsing globals:\n'
for global_name, exception in errors:
message += '%s: %s: %s\n' % (global_name, exception.__class__.__name__, str(exception))
raise Exception(message)
else:
for global_name, exception in errors:
evaled_globals[global_name] = exception
break
previous_errors = len(errors)
# Assemble results into a dictionary of the same format as sequence_globals:
for group_name in sequence_globals:
for global_name in sequence_globals[group_name]:
# Do not attempt to override exception objects already stored
# as the result of multiply defined globals:
if not global_name in results[group_name]:
results[group_name][global_name] = evaled_globals[global_name]
return results, global_hierarchy, expansions
[docs]def expand_globals(sequence_globals, evaled_globals, expansion_config = None, return_dimensions = False):
"""Expands iterable globals according to their expansion
settings. Creates a number of 'axes' which are to be outer product'ed
together. Some of these axes have only one element, these are globals
that do not vary. Some have a set of globals being zipped together,
iterating in lock-step. Others contain a single global varying
across its values (the globals set to 'outer' expansion). Returns
a list of shots, each element of which is a dictionary for that
shot's globals."""
if expansion_config is None:
order = {}
shuffle = {}
else:
order = {k:v['order'] for k,v in expansion_config.items() if 'order' in v}
shuffle = {k:v['shuffle'] for k,v in expansion_config.items() if 'shuffle' in v}
values = {}
expansions = {}
for group_name in sequence_globals:
for global_name in sequence_globals[group_name]:
expression, units, expansion = sequence_globals[group_name][global_name]
value = evaled_globals[group_name][global_name]
values[global_name] = value
expansions[global_name] = expansion
# Get a list of the zip keys in use:
zip_keys = set(expansions.values())
try:
zip_keys.remove('outer')
except KeyError:
pass
axes = {}
global_names = {}
dimensions = {}
for zip_key in zip_keys:
axis = []
zip_global_names = []
for global_name in expansions:
if expansions[global_name] == zip_key:
value = values[global_name]
if isinstance(value, Exception):
continue
if not zip_key:
# Wrap up non-iterating globals (with zip_key = '') in a
# one-element list. When zipped and then outer product'ed,
# this will give us the result we want:
value = [value]
axis.append(value)
zip_global_names.append(global_name)
axis = list(zip(*axis))
dimensions['zip '+zip_key] = len(axis)
axes['zip '+zip_key] = axis
global_names['zip '+zip_key] = zip_global_names
# Give each global being outer-product'ed its own axis. It gets
# wrapped up in a list and zipped with itself so that it is in the
# same format as the zipped globals, ready for outer-producting
# together:
for global_name in expansions:
if expansions[global_name] == 'outer':
value = values[global_name]
if isinstance(value, Exception):
continue
axis = [value]
axis = list(zip(*axis))
dimensions['outer '+global_name] = len(axis)
axes['outer '+global_name] = axis
global_names['outer '+global_name] = [global_name]
# add any missing items to order and dimensions
for key, value in axes.items():
if key not in order:
order[key] = -1
if key not in shuffle:
shuffle[key] = False
if key not in dimensions:
dimensions[key] = 1
# shuffle relevant axes
for axis_name, axis_values in axes.items():
if shuffle[axis_name]:
random.shuffle(axis_values)
# sort axes and global names by order
axes = [axes.get(key) for key in sorted(order, key=order.get)]
global_names = [global_names.get(key) for key in sorted(order, key=order.get)]
# flatten the global names
global_names = [global_name for global_list in global_names for global_name in global_list]
shots = []
for axis_values in itertools.product(*axes):
# values here is a tuple of tuples, with the outer list being over
# the axes. We need to flatten it to get our individual values out
# for each global, since we no longer care what axis they are on:
global_values = [value for axis in axis_values for value in axis]
shot_globals = dict(zip(global_names, global_values))
shots.append(shot_globals)
if return_dimensions:
return shots, dimensions
else:
return shots
[docs]def next_sequence_index(shot_basedir, dt, increment=True):
"""Return the next sequence index for sequences in the given base directory (i.e.
<experiment_shot_storage>/<script_basename>) and the date of the given datetime
object, and increment the sequence index atomically on disk if increment=True. If
not setting increment=True, then the result is indicative only and may be used by
other code at any time. One must increment the sequence index prior to use."""
from labscript_utils.ls_zprocess import Lock
from labscript_utils.shared_drive import path_to_agnostic
DATE_FORMAT = '%Y-%m-%d'
# The file where we store the next sequence index on disk:
sequence_index_file = os.path.join(shot_basedir, '.next_sequence_index')
# Open with zlock to prevent race conditions with other code:
with Lock(path_to_agnostic(sequence_index_file), read_only=not increment):
try:
with open(sequence_index_file) as f:
datestr, sequence_index = json.load(f)
if datestr != dt.strftime(DATE_FORMAT):
# New day, start from zero again:
sequence_index = 0
except (OSError, IOError) as exc:
if exc.errno != errno.ENOENT:
raise
# File doesn't exist yet, start from zero
sequence_index = 0
if increment:
# Write the new file with the incremented sequence index
os.makedirs(os.path.dirname(sequence_index_file), exist_ok=True)
with open(sequence_index_file, 'w') as f:
json.dump([dt.strftime(DATE_FORMAT), sequence_index + 1], f)
return sequence_index
[docs]def new_sequence_details(script_path, config=None, increment_sequence_index=True):
"""Generate the details for a new sequence: the toplevel attrs sequence_date,
sequence_index, sequence_id; and the the output directory and filename prefix for
the shot files, according to labconfig settings. If increment_sequence_index=True,
then we are claiming the resulting sequence index for use such that it cannot be
used by anyone else. This should be done if the sequence details are immediately
about to be used to compile a sequence. Otherwise, set increment_sequence_index to
False, but in that case the results are indicative only and one should call this
function again with increment_sequence_index=True before compiling the sequence, as
otherwise the sequence_index may be used by other code in the meantime."""
if config is None:
config = LabConfig()
script_basename = os.path.splitext(os.path.basename(script_path))[0]
shot_storage = config.get('DEFAULT', 'experiment_shot_storage')
shot_basedir = os.path.join(shot_storage, script_basename)
now = datetime.datetime.now()
sequence_timestamp = now.strftime('%Y%m%dT%H%M%S')
# Toplevel attributes to be saved to the shot files:
sequence_date = now.strftime('%Y-%m-%d')
sequence_id = sequence_timestamp + '_' + script_basename
sequence_index = next_sequence_index(shot_basedir, now, increment_sequence_index)
sequence_attrs = {
'script_basename': script_basename,
'sequence_date': sequence_date,
'sequence_index': sequence_index,
'sequence_id': sequence_id,
}
# Compute the output directory based on labconfig settings:
try:
subdir_format = config.get('runmanager', 'output_folder_format')
except (LabConfig.NoOptionError, LabConfig.NoSectionError):
subdir_format = os.path.join('%Y', '%m', '%d', '{sequence_index:05d}')
# Format the output directory according to the current timestamp, sequence index and
# sequence_timestamp, if present in the format string:
subdir = now.strftime(subdir_format).format(
sequence_index=sequence_index, sequence_timestamp=sequence_timestamp
)
shot_output_dir = os.path.join(shot_basedir, subdir)
# Compute the shot filename prefix according to labconfig settings:
try:
filename_prefix_format = config.get('runmanager', 'filename_prefix_format')
except (LabConfig.NoOptionError, LabConfig.NoSectionError):
# Default, for backward compatibility:
filename_prefix_format = '{sequence_timestamp}_{script_basename}'
# Format the filename prefix according to the current timestamp, sequence index,
# sequence_timestamp, and script_basename, if present in the format string:
filename_prefix = now.strftime(filename_prefix_format).format(
sequence_index=sequence_index,
sequence_timestamp=sequence_timestamp,
script_basename=script_basename,
)
return sequence_attrs, shot_output_dir, filename_prefix
[docs]def make_run_files(
output_folder,
sequence_globals,
shots,
sequence_attrs,
filename_prefix,
shuffle=False,
):
"""Does what it says. sequence_globals and shots are of the datatypes returned by
get_globals and get_shots, one is a nested dictionary with string values, and the
other a flat dictionary. sequence_attrs is a dict of the attributes pertaining to
this sequence to be initially set at the top-level group of the h5 file, as returned
by new_sequence_details. output_folder and filename_prefix determine the directory
shot files will be output to, as well as their filenames (this function will
generate filenames with the shot number and .h5 extension appended to
filename_prefix). Sensible defaults for these are also returned by
new_sequence_details(), so preferably these should be used.
Shuffle will randomise the order that the run files are generated in with respect to
which element of shots they come from. This function returns a *generator*. The run
files are not actually created until you loop over this generator (which gives you
the filepaths). This is useful for not having to clean up as many unused files in
the event of failed compilation of labscripts. If you want all the run files to be
created at some point, simply convert the returned generator to a list. The
filenames the run files are given is simply the sequence_id with increasing integers
appended."""
basename = os.path.join(output_folder, filename_prefix)
nruns = len(shots)
ndigits = int(np.ceil(np.log10(nruns)))
if shuffle:
random.shuffle(shots)
for i, shot_globals in enumerate(shots):
runfilename = ('%s_%0' + str(ndigits) + 'd.h5') % (basename, i)
make_single_run_file(
runfilename, sequence_globals, shot_globals, sequence_attrs, i, nruns
)
yield runfilename
[docs]def make_single_run_file(filename, sequenceglobals, runglobals, sequence_attrs, run_no, n_runs):
"""Does what it says. runglobals is a dict of this run's globals, the format being
the same as that of one element of the list returned by expand_globals.
sequence_globals is a nested dictionary of the type returned by get_globals.
sequence_attrs is a dict of attributes pertaining to this sequence, as returned by
new_sequence_details. run_no and n_runs must be provided, if this run file is part
of a sequence, then they should reflect how many run files are being generated in
this sequence, all of which must have identical sequence_attrs."""
os.makedirs(os.path.dirname(filename), exist_ok=True)
with h5py.File(filename, 'w') as f:
f.attrs.update(sequence_attrs)
f.attrs['run number'] = run_no
f.attrs['n_runs'] = n_runs
f.create_group('globals')
if sequenceglobals is not None:
for groupname, groupvars in sequenceglobals.items():
group = f['globals'].create_group(groupname)
unitsgroup = group.create_group('units')
expansiongroup = group.create_group('expansion')
for name, (value, units, expansion) in groupvars.items():
group.attrs[name] = value
unitsgroup.attrs[name] = units
expansiongroup.attrs[name] = expansion
for name, value in runglobals.items():
if value is None:
# Store it as a null object reference:
value = h5py.Reference()
try:
f['globals'].attrs[name] = value
except Exception as e:
message = ('Global %s cannot be saved as an hdf5 attribute. ' % name +
'Globals can only have relatively simple datatypes, with no nested structures. ' +
'Original error was:\n' +
'%s: %s' % (e.__class__.__name__, str(e)))
raise ValueError(message)
[docs]def make_run_file_from_globals_files(labscript_file, globals_files, output_path, config=None):
"""Creates a run file output_path, using all the globals from globals_files. Uses
labscript_file to determine the sequence_attrs only"""
groups = get_all_groups(globals_files)
sequence_globals = get_globals(groups)
evaled_globals, global_hierarchy, expansions = evaluate_globals(sequence_globals)
shots = expand_globals(sequence_globals, evaled_globals)
if len(shots) > 1:
scanning_globals = []
for global_name in expansions:
if expansions[global_name]:
scanning_globals.append(global_name)
raise ValueError('Cannot compile to a single run file: The following globals are a sequence: ' +
', '.join(scanning_globals))
sequence_attrs, _, _ = new_sequence_details(
labscript_file, config=config, increment_sequence_index=True
)
make_single_run_file(output_path, sequence_globals, shots[0], sequence_attrs, 1, 1)
[docs]def compile_labscript(labscript_file, run_file):
"""Compiles labscript_file with the run file, returning
the processes return code, stdout and stderr."""
proc = subprocess.Popen([sys.executable, labscript_file, run_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
return proc.returncode, stdout, stderr
[docs]def compile_labscript_with_globals_files(labscript_file, globals_files, output_path):
"""Creates a run file output_path, using all the globals from
globals_files. Compiles labscript_file with the run file, returning
the processes return code, stdout and stderr."""
make_run_file_from_globals_files(labscript_file, globals_files, output_path)
returncode, stdout, stderr = compile_labscript(labscript_file, output_path)
return returncode, stdout, stderr
[docs]def compile_labscript_async(labscript_file, run_file, stream_port, done_callback):
"""Compiles labscript_file with run_file. This function is designed to be called in
a thread. The stdout and stderr from the compilation will be shovelled into
stream_port via zmq push as it spews forth, and when compilation is complete,
done_callback will be called with a boolean argument indicating success. Note that
the zmq communication will be encrypted, or not, according to security settings in
labconfig. If you want to receive the data on a zmq socket, do so using a PULL
socket created from a labscript_utils.ls_zprocess.Context, or using a
labscript_utils.ls_zprocess.ZMQServer. These subclasses will also be configured
with the appropriate security settings and will be able to receive the messages.
"""
compiler_path = os.path.join(os.path.dirname(__file__), 'batch_compiler.py')
to_child, from_child, child = process_tree.subprocess(
compiler_path, output_redirection_port=stream_port
)
to_child.put(['compile', [labscript_file, run_file]])
while True:
signal, data = from_child.get()
if signal == 'done':
success = data
to_child.put(['quit', None])
child.communicate()
done_callback(success)
break
else:
raise RuntimeError((signal, data))
[docs]def compile_multishot_async(labscript_file, run_files, stream_port, done_callback):
"""Compiles labscript_file with run_files. This function is designed to be called in
a thread. The stdout and stderr from the compilation will be shovelled into
stream_port via zmq push as it spews forth, and when each compilation is complete,
done_callback will be called with a boolean argument indicating success. Compilation
will stop after the first failure. If you want to receive the data on a zmq socket,
do so using a PULL socket created from a labscript_utils.ls_zprocess.Context, or
using a labscript_utils.ls_zprocess.ZMQServer. These subclasses will also be
configured with the appropriate security settings and will be able to receive the
messages."""
compiler_path = os.path.join(os.path.dirname(__file__), 'batch_compiler.py')
to_child, from_child, child = process_tree.subprocess(
compiler_path, output_redirection_port=stream_port
)
try:
for run_file in run_files:
to_child.put(['compile', [labscript_file, run_file]])
while True:
signal, data = from_child.get()
if signal == 'done':
success = data
done_callback(data)
break
if not success:
break
except Exception:
error = traceback.format_exc()
zmq_push_multipart(stream_port, data=[b'stderr', error.encode('utf-8')])
to_child.put(['quit', None])
child.communicate()
raise
to_child.put(['quit', None])
child.communicate()
[docs]def compile_labscript_with_globals_files_async(labscript_file, globals_files, output_path, stream_port, done_callback):
"""Same as compile_labscript_with_globals_files, except it launches a thread to do
the work and does not return anything. Instead, stderr and stdout will be put to
stream_port via zmq push in the multipart message format ['stdout','hello, world\n']
etc. When compilation is finished, the function done_callback will be called a
boolean argument indicating success or failure. If you want to receive the data on
a zmq socket, do so using a PULL socket created from a
labscript_utils.ls_zprocess.Context, or using a
labscript_utils.ls_zprocess.ZMQServer. These subclasses will also be configured with
the appropriate security settings and will be able to receive the messages."""
try:
make_run_file_from_globals_files(labscript_file, globals_files, output_path)
thread = threading.Thread(
target=compile_labscript_async, args=[labscript_file, output_path, stream_port, done_callback])
thread.daemon = True
thread.start()
except Exception:
error = traceback.format_exc()
zmq_push_multipart(stream_port, data=[b'stderr', error.encode('utf-8')])
t = threading.Thread(target=done_callback, args=(False,))
t.daemon = True
t.start()
[docs]def get_shot_globals(filepath):
"""Returns the evaluated globals for a shot, for use by labscript or lyse.
Simple dictionary access as in dict(h5py.File(filepath).attrs) would be fine
except we want to apply some hacks, so it's best to do that in one place."""
params = {}
with h5py.File(filepath, 'r') as f:
for name, value in f['globals'].attrs.items():
# Convert numpy bools to normal bools:
if isinstance(value, np.bool_):
value = bool(value)
# Convert null HDF references to None:
if isinstance(value, h5py.Reference) and not value:
value = None
# Convert numpy strings to Python ones.
# DEPRECATED, for backward compat with old files.
if isinstance(value, np.str_):
value = str(value)
if isinstance(value, bytes):
value = value.decode()
params[name] = value
return params
[docs]def dict_diff(dict1, dict2):
"""Return the difference between two dictionaries as a dictionary of key: [val1, val2] pairs.
Keys unique to either dictionary are included as key: [val1, '-'] or key: ['-', val2]."""
diff_keys = []
common_keys = np.intersect1d(list(dict1.keys()), list(dict2.keys()))
for key in common_keys:
if np.iterable(dict1[key]) or np.iterable(dict2[key]):
if not np.array_equal(dict1[key], dict2[key]):
diff_keys.append(key)
else:
if dict1[key] != dict2[key]:
diff_keys.append(key)
dict1_unique = [key for key in dict1.keys() if key not in common_keys]
dict2_unique = [key for key in dict2.keys() if key not in common_keys]
diff = {}
for key in diff_keys:
diff[key] = [dict1[key], dict2[key]]
for key in dict1_unique:
diff[key] = [dict1[key], '-']
for key in dict2_unique:
diff[key] = ['-', dict2[key]]
return diff
[docs]def flatten_globals(sequence_globals, evaluated=False):
"""Flattens the data structure of the globals. If evaluated=False,
saves only the value expression string of the global, not the
units or expansion."""
flattened_sequence_globals = {}
for globals_group in sequence_globals.values():
for name, value in globals_group.items():
if evaluated:
flattened_sequence_globals[name] = value
else:
value_expression, units, expansion = value
flattened_sequence_globals[name] = value_expression
return flattened_sequence_globals
[docs]def globals_diff_groups(active_groups, other_groups, max_cols=1000, return_string=True):
"""Given two sets of globals groups, perform a diff of the raw
and evaluated globals."""
our_sequence_globals = get_globals(active_groups)
other_sequence_globals = get_globals(other_groups)
# evaluate globals
our_evaluated_sequence_globals, _, _ = evaluate_globals(our_sequence_globals, raise_exceptions=False)
other_evaluated_sequence_globals, _, _ = evaluate_globals(other_sequence_globals, raise_exceptions=False)
# flatten globals dictionaries
our_globals = flatten_globals(our_sequence_globals, evaluated=False)
other_globals = flatten_globals(other_sequence_globals, evaluated=False)
our_evaluated_globals = flatten_globals(our_evaluated_sequence_globals, evaluated=True)
other_evaluated_globals = flatten_globals(other_evaluated_sequence_globals, evaluated=True)
# diff the *evaluated* globals
value_differences = dict_diff(other_evaluated_globals, our_evaluated_globals)
# We are interested only in displaying globals where *both* the
# evaluated global *and* its unevaluated expression (ignoring comments
# and whitespace) differ. This will minimise false positives where a
# slight change in an expression still leads to the same value, or
# where an object has a poorly defined equality operator that returns
# False even when the two objects are identical.
filtered_differences = {}
for name, (other_value, our_value) in value_differences.items():
our_expression = our_globals.get(name, '-')
other_expression = other_globals.get(name, '-')
# Strip comments, get tokens so we can diff without being sensitive to comments or whitespace:
our_expression, our_tokens = remove_comments_and_tokenify(our_expression)
other_expression, other_tokens = remove_comments_and_tokenify(other_expression)
if our_tokens != other_tokens:
filtered_differences[name] = [repr(other_value), repr(our_value), other_expression, our_expression]
if filtered_differences:
import pandas as pd
df = pd.DataFrame.from_dict(filtered_differences, 'index')
df = df.sort_index()
df.columns = ['Prev (Eval)', 'Current (Eval)', 'Prev (Raw)', 'Current (Raw)']
df_string = df.to_string(max_cols=max_cols)
payload = df_string + '\n\n'
else:
payload = 'Evaluated globals are identical to those of selected file.\n'
if return_string:
return payload
else:
print(payload)
return df
def globals_diff_shots(file1, file2, max_cols=100):
# Get file's globals groups
active_groups = get_all_groups(file1)
# Get other file's globals groups
other_groups = get_all_groups(file2)
print('Globals diff between:\n%s\n%s\n\n' % (file1, file2))
return globals_diff_groups(active_groups, other_groups, max_cols=max_cols, return_string=False)