Macro scans examples

This chapter consists of a series of examples demonstrating specific features or tricks for programming custom scan macros using the Sardana scan framework.

  1##############################################################################
  2##
  3# This file is part of Sardana
  4##
  5# http://www.sardana-controls.org/
  6##
  7# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
  8##
  9# Sardana is free software: you can redistribute it and/or modify
 10# it under the terms of the GNU Lesser General Public License as published by
 11# the Free Software Foundation, either version 3 of the License, or
 12# (at your option) any later version.
 13##
 14# Sardana is distributed in the hope that it will be useful,
 15# but WITHOUT ANY WARRANTY; without even the implied warranty of
 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 17# GNU Lesser General Public License for more details.
 18##
 19# You should have received a copy of the GNU Lesser General Public License
 20# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
 21##
 22##############################################################################
 23
 24"""
 25    Macro library containning examples demonstrating specific features or tricks
 26    for programming macros for Sardana.
 27
 28   Available Macros are:
 29     ascanr
 30     toothedtriangle
 31
 32"""
 33
 34__all__ = ["ascan_demo", "ascanr", "toothedtriangle",
 35           "a2scan_mod"]
 36
 37__docformat__ = 'restructuredtext'
 38
 39import numpy
 40
 41from sardana.macroserver.macro import Macro, Hookable, Type
 42from sardana.macroserver.scan import *
 43
 44
 45class ascan_demo(Macro):
 46    """
 47    This is a basic reimplementation of the ascan` macro for demonstration
 48    purposes of the Generic Scan framework. The "real" implementation of
 49    :class:`sardana.macroserver.macros.ascan` derives from
 50    :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
 51    """
 52
 53    # this is used to indicate other codes that the macro is a scan
 54    hints = {'scan': 'ascan_demo'}
 55    # this hints that the macro requires the ActiveMntGrp environment variable
 56    # to be set
 57    env = ('ActiveMntGrp',)
 58
 59    param_def = [
 60        ['motor',      Type.Moveable, None, 'Motor to move'],
 61        ['start_pos',  Type.Float,    None, 'Scan start position'],
 62        ['final_pos',  Type.Float,    None, 'Scan final position'],
 63        ['nr_interv',  Type.Integer,  None, 'Number of scan intervals'],
 64        ['integ_time', Type.Float,    None, 'Integration time']
 65    ]
 66
 67    def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
 68        # parse the user parameters
 69        self.start = numpy.array([start_pos], dtype='d')
 70        self.final = numpy.array([final_pos], dtype='d')
 71        self.integ_time = integ_time
 72
 73        self.nb_points = nr_interv + 1
 74        self.interv_size = (self.final - self.start) / nr_interv
 75        self.name = 'ascan_demo'
 76        # the "env" dictionary may be passed as an option
 77        env = opts.get('env', {})
 78
 79        # create an instance of GScan (in this case, of its child, SScan
 80        self._gScan = SScan(self, generator=self._generator,
 81                            moveables=[motor], env=env)
 82
 83    def _generator(self):
 84        step = {}
 85        # integ_time is the same for all steps
 86        step["integ_time"] = self.integ_time
 87        for point_no in range(self.nb_points):
 88            step["positions"] = self.start + point_no * \
 89                self.interv_size  # note that this is a numpy array
 90            step["point_id"] = point_no
 91            yield step
 92
 93    def run(self, *args):
 94        for step in self._gScan.step_scan():  # just go through the steps
 95            yield step
 96
 97    @property
 98    def data(self):
 99        return self._gScan.data  # the GScan provides scan data
100
101    def _get_nr_points(self):
102        msg = ("nr_points is deprecated since version 3.0.3. "
103               "Use nb_points instead.")
104        self.warning(msg)
105        return self.nb_points
106
107    nr_points = property(_get_nr_points)
108
109
110class ascanr(Macro, Hookable):
111    """This is an example of how to handle adding extra info columns in a scan.
112    Does the same than ascan but repeats the acquisitions "repeat" times for each step.
113    It could be implemented deriving from aNscan, but I do it like this for clarity.
114    Look for the comments with "!!!" for tips specific to the extra info columns
115    I do not support constrains in this one for simplicity (see ascan for that)
116
117    Do an absolute scan of the specified motor, repeating measurements in each step.
118    ascanr scans one motor, as specified by motor. The motor starts at the
119    position given by start_pos and ends at the position given by final_pos.
120    At each step, the acquisition will be repeated "repeat" times
121    The step size is (start_pos-final_pos)/nr_interv. The number of data points collected
122    will be (nr_interv+1)*repeat. Count time for each acquisition is given by time which if positive,
123    specifies seconds and if negative, specifies monitor counts. """
124
125    hints = {'scan': 'ascanr', 'allowsHooks': (
126        'pre-move', 'post-move', 'pre-acq', 'post-acq', 'post-step')}
127    env = ('ActiveMntGrp',)
128
129    param_def = [
130        ['motor',      Type.Moveable, None, 'Motor to move'],
131        ['start_pos',  Type.Float,    None, 'Scan start position'],
132        ['final_pos',  Type.Float,    None, 'Scan final position'],
133        ['nr_interv',  Type.Integer,  None, 'Number of scan intervals'],
134        ['integ_time', Type.Float,    None, 'Integration time'],
135        ['repeat',     Type.Integer,  None, 'Number of Repetitions']
136    ]
137
138    def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, repeat,
139                **opts):
140
141        self.starts = numpy.array([start_pos], dtype='d')
142        self.finals = numpy.array([final_pos], dtype='d')
143        self.nr_interv = nr_interv
144        self.integ_time = integ_time
145        self.repeat = repeat
146        self.opts = opts
147
148        self.nb_points = (nr_interv + 1) * repeat
149        self.interv_sizes = (self.finals - self.starts) / nr_interv
150        self.name = 'ascanr'
151
152        generator = self._generator
153        moveables = [motor]
154        env = opts.get('env', {})
155        constrains = []
156        extrainfodesc = [ColumnDesc(name='repetition',
157                                    dtype='int64', shape=())]  # !!!
158
159        self._gScan = SScan(self, generator, moveables, env,
160                            constrains, extrainfodesc)  # !!!
161
162    def _generator(self):
163        step = {}
164        step["integ_time"] = self.integ_time
165        step["pre-move-hooks"] = self.getHooks('pre-move')
166        step["post-move-hooks"] = self.getHooks('post-move')
167        step["pre-acq-hooks"] = self.getHooks('pre-acq')
168        step["post-acq-hooks"] = self.getHooks('post-acq') + self.getHooks(
169            '_NOHINTS_')
170        step["post-step-hooks"] = self.getHooks('post-step')
171        step["check_func"] = []
172        for point_no in range(self.nb_points // self.repeat):
173            extrainfo = {"repetition": 0}  # !!!
174            step['extrainfo'] = extrainfo  # !!!
175            step["positions"] = self.starts + point_no * self.interv_sizes
176            step["point_id"] = point_no
177            yield step
178            for i in range(1, self.repeat):
179                extrainfo["repetition"] = i  # !!!
180                step["positions"] = [None]
181                yield step
182
183    def run(self, *args):
184        for step in self._gScan.step_scan():
185            yield step
186
187    @property
188    def data(self):
189        return self._gScan.data
190
191    def _get_nr_points(self):
192        msg = ("nr_points is deprecated since version 3.0.3. "
193               "Use nb_points instead.")
194        self.warning(msg)
195        return self.nb_points
196
197    nr_points = property(_get_nr_points)
198
199
200class toothedtriangle(Macro, Hookable):
201    """toothedtriangle macro implemented with the gscan framework.
202    It performs nr_cycles cycles, each consisting of two stages: the first half
203    of the cycle it behaves like the ascan macro (from start_pos to stop_pos in
204    nr_interv+1 steps).For the second half of the cycle it steps back until
205    it undoes the first half and is ready for the next cycle.
206    At each step, nr_samples acquisitions are performed.
207    The total number of points in the scan is nr_interv*2*nr_cycles*nr_samples+1"""
208
209    hints = {'scan': 'toothedtriangle',
210             'allowsHooks': ('pre-scan', 'pre-move', 'post-move', 'pre-acq',
211                             'post-acq', 'post-step', 'post-scan')
212             }
213    env = ('ActiveMntGrp',)
214
215    param_def = [
216        ['motor',      Type.Moveable, None, 'Motor to move'],
217        ['start_pos',  Type.Float,    None, 'start position'],
218        ['final_pos',  Type.Float,    None, 'position after half cycle'],
219        ['nr_interv',  Type.Integer,  None, 'Number of intervals in half cycle'],
220        ['integ_time', Type.Float,    None, 'Integration time'],
221        ['nr_cycles',  Type.Integer,  None, 'Number of cycles'],
222        ['nr_samples', Type.Integer,  1, 'Number of samples at each point']
223    ]
224
225    def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time,
226                nr_cycles, nr_samples, **opts):
227
228        self.start_pos = start_pos
229        self.final_pos = final_pos
230        self.nr_interv = nr_interv
231        self.integ_time = integ_time
232        self.nr_cycles = nr_cycles
233        self.nr_samples = nr_samples
234        self.opts = opts
235        cycle_nb_points = self.nr_interv + 1 + (self.nr_interv + 1) - 2
236        self.nb_points = cycle_nb_points * nr_samples * nr_cycles + nr_samples
237
238        self.interv_size = (self.final_pos - self.start_pos) / nr_interv
239        self.name = 'toothedtriangle'
240
241        generator = self._generator
242        moveables = []
243        moveable = MoveableDesc(moveable=motor, is_reference=True,
244                                min_value=min(start_pos, final_pos),
245                                max_value=max(start_pos, final_pos))
246        moveables = [moveable]
247        env = opts.get('env', {})
248        constrains = []
249        extrainfodesc = [ColumnDesc(name='cycle', dtype='int64', shape=()),
250                         ColumnDesc(name='interval',
251                                    dtype='int64', shape=()),
252                         ColumnDesc(name='sample', dtype='int64', shape=())]  # !!!
253
254        self._gScan = SScan(self, generator, moveables, env,
255                            constrains, extrainfodesc)  # !!!
256
257    def _generator(self):
258        step = {}
259        step["integ_time"] = self.integ_time
260        step["pre-move-hooks"] = self.getHooks('pre-move')
261        step["post-move-hooks"] = self.getHooks('post-move')
262        step["pre-acq-hooks"] = self.getHooks('pre-acq')
263        step[
264            "post-acq-hooks"] = self.getHooks('post-acq') + self.getHooks('_NOHINT_')
265        step["post-step-hooks"] = self.getHooks('post-step')
266        step["check_func"] = []
267        extrainfo = {"cycle": None, "interval": None, "sample": None, }
268        step['extrainfo'] = extrainfo
269        halfcycle1 = list(range(self.nr_interv + 1))
270        halfcycle2 = halfcycle1[1:-1]
271        halfcycle2.reverse()
272        intervallist = halfcycle1 + halfcycle2
273        point_no = 0
274        for cycle in range(self.nr_cycles):
275            extrainfo["cycle"] = cycle
276            for interval in intervallist:
277                extrainfo["interval"] = interval
278                step["positions"] = numpy.array(
279                    [self.start_pos + (interval) * self.interv_size], dtype='d')
280                for sample in range(self.nr_samples):
281                    extrainfo["sample"] = sample
282                    step["point_id"] = point_no
283                    yield step
284                    point_no += 1
285
286        # last step for closing the loop
287        extrainfo["interval"] = 0
288        step["positions"] = numpy.array([self.start_pos], dtype='d')
289        for sample in range(self.nr_samples):
290            extrainfo["sample"] = sample
291            step["point_id"] = point_no
292            yield step
293            point_no += 1
294
295    def run(self, *args):
296        for step in self._gScan.step_scan():
297            yield step
298
299    @property
300    def data(self):
301        return self._gScan.data
302
303    def _get_nr_points(self):
304        msg = ("nr_points is deprecated since version 3.0.3. "
305               "Use nb_points instead.")
306        self.warning(msg)
307        return self.nb_points
308
309    nr_points = property(_get_nr_points)
310
311
312class a2scan_mod(Macro):
313    """a2scan_mod.
314    Do an a2scan with the particularity of different intervals per motor: int_mot1, int_mot2.
315    If int_mot2 < int_mot1, mot2 will change position every int(int_mot1/int_mot2) steps of mot1.
316    It uses the gscan framework.
317    """
318
319    hints = {'scan': 'a2scan_mod'}
320    env = ('ActiveMntGrp',)
321
322    param_def = [
323        ['motor1',      Type.Moveable, None, 'Motor 1 to move'],
324        ['start_pos1',  Type.Float,    None, 'Scan start position 1'],
325        ['final_pos1',  Type.Float,    None, 'Scan final position 1'],
326        ['nr_interv1',  Type.Integer,  None, 'Number of scan intervals of Motor 1'],
327        ['motor2',      Type.Moveable, None, 'Motor 2 to move'],
328        ['start_pos2',  Type.Float,    None, 'Scan start position 2'],
329        ['final_pos2',  Type.Float,    None, 'Scan final position 2'],
330        ['nr_interv2',  Type.Integer,  None, 'Number of scan intervals of Motor 2'],
331        ['integ_time',  Type.Float,    None, 'Integration time']
332    ]
333
334    def prepare(self, motor1, start_pos1, final_pos1, nr_interv1, motor2, start_pos2, final_pos2, nr_interv2, integ_time,
335                **opts):
336        self.name = 'a2scan_mod'
337        self.integ_time = integ_time
338        self.start_pos1 = start_pos1
339        self.final_pos1 = final_pos1
340        self.nr_interv1 = nr_interv1
341        self.start_pos2 = start_pos2
342        self.final_pos2 = final_pos2
343        self.nr_interv2 = nr_interv2
344
345        generator = self._generator
346        moveables = [motor1, motor2]
347        env = opts.get('env', {})
348        constraints = []
349        self._gScan = SScan(self, generator, moveables, env, constraints)
350
351    def _generator(self):
352        step = {}
353        step["integ_time"] = self.integ_time
354
355        start1, end1, interv1 = self.start_pos1, self.final_pos1, self.nr_interv1
356        start2, end2, interv2 = self.start_pos2, self.final_pos2, self.nr_interv2
357
358        # Prepare the positions
359        positions_m1 = numpy.linspace(start1, end1, interv1 + 1)
360        positions_m2 = numpy.linspace(start2, end2, interv2 + 1)
361
362        if interv1 > interv2:
363            positions_m2 = start2 + ((end2 - start2) / interv2) * (
364                numpy.arange(interv1 + 1) // (interv1 / interv2))
365        elif interv2 > interv1:
366            positions_m1 = start1 + ((end1 - start1) / interv1) * (
367                numpy.arange(interv2 + 1) // (interv2 / interv1))
368
369        point_id = 0
370        for pos1, pos2 in zip(positions_m1, positions_m2):
371            step['point_id'] = point_id
372            step['positions'] = [pos1, pos2]
373            yield step
374            point_id += 1
375
376    def run(self, *args):
377        for step in self._gScan.step_scan():
378            yield step
379
380
381class ascanc_demo(Macro):
382    """
383    This is a basic reimplementation of the ascanc` macro for demonstration
384    purposes of the Generic Scan framework. The "real" implementation of
385    :class:`sardana.macroserver.macros.ascanc` derives from
386    :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
387    """
388
389    # this is used to indicate other codes that the macro is a scan
390    hints = {'scan': 'ascanc_demo'}
391    # this hints that the macro requires the ActiveMntGrp environment variable
392    # to be set
393    env = ('ActiveMntGrp',)
394
395    param_def = [
396        ['motor',      Type.Moveable, None, 'Motor to move'],
397        ['start_pos',  Type.Float,    None, 'Scan start position'],
398        ['final_pos',  Type.Float,    None, 'Scan final position'],
399        ['integ_time', Type.Float,    None, 'Integration time']
400    ]
401
402    def prepare(self, motor, start_pos, final_pos, integ_time, **opts):
403        self.name = 'ascanc_demo'
404        # parse the user parameters
405        self.start = numpy.array([start_pos], dtype='d')
406        self.final = numpy.array([final_pos], dtype='d')
407        self.integ_time = integ_time
408        # the "env" dictionary may be passed as an option
409        env = opts.get('env', {})
410
411        # create an instance of GScan (in this case, of its child, CScan
412        self._gScan = CScan(self,
413                            waypointGenerator=self._waypoint_generator,
414                            periodGenerator=self._period_generator,
415                            moveables=[motor],
416                            env=env)
417
418    def _waypoint_generator(self):
419        # a very simple waypoint generator! only start and stop points!
420        yield {"positions": self.start, "waypoint_id": 0}
421        yield {"positions": self.final, "waypoint_id": 1}
422
423    def _period_generator(self):
424        step = {}
425        step["integ_time"] = self.integ_time
426        point_no = 0
427        while(True):  # infinite generator. The acquisition loop is started/stopped at begin and end of each waypoint
428            point_no += 1
429            step["point_id"] = point_no
430            yield step
431
432    def run(self, *args):
433        for step in self._gScan.step_scan():
434            yield step
435
436
437class ascan_with_addcustomdata(ascan_demo):
438    '''
439    example of an ascan-like macro where we demonstrate how to pass custom data to the data handler.
440    This is an extension of the ascan_demo macro. Wemake several calls to `:meth:DataHandler.addCustomData`
441    exemplifying different features.
442    At least the following recorders will act on custom data:
443      - OutputRecorder (this will ignore array data)
444      - NXscan_FileRecorder
445      - SPEC_FileRecorder (this will ignore array data)
446    '''
447
448    def run(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
449        # we get the datahandler
450        dh = self._gScan._data_handler
451        # at this point the entry name is not yet set, so we give it explicitly
452        # (otherwise it would default to "entry")
453        dh.addCustomData('Hello world1', 'dummyChar1',
454                         nxpath='/custom_entry:NXentry/customdata:NXcollection')
455        # this is the normal scan loop
456        for step in self._gScan.step_scan():
457            yield step
458        # the entry number is known and the default nxpath is used
459        # "/<currententry>/custom_data") if none given
460        dh.addCustomData('Hello world1', 'dummyChar1')
461        # you can pass arrays (but not all recorders will handle them)
462        dh.addCustomData(list(range(10)), 'dummyArray1')
463        # you can pass a custom nxpath *relative* to the current entry
464        dh.addCustomData('Hello world2', 'dummyChar2',
465                         nxpath='sample:NXsample')
466
467        # calculate a linear fit to the timestamps VS motor positions and store
468        # it
469        x = [r.data[motor.getName()] for r in self.data.records]
470        y = [r.data['timestamp'] for r in self.data.records]
471        fitted_y = numpy.polyval(numpy.polyfit(x, y, 1), x)
472        dh.addCustomData(fitted_y, 'fittedtime',
473                         nxpath='measurement:NXcollection')
474
475        # as a bonus, plot the fit
476        self.pyplot.plot(x, y, 'ro')
477        self.pyplot.plot(x, fitted_y, 'b-')
478        self.pyplot.draw()
479
480
481class ascanct_midtrigger(Macro):
482    """This macro demonstrates how to add an extra scan column with
483    the shifted positions of the motor corresponding to the middle of the
484    space interval. Be aware that the space interval does not
485    necessarily correspond to the middle of the acquisition interval
486    (remember about the latency time).
487
488    This macro does not export all the ascanct features e.g. hooks.
489    """
490
491    param_def = [['motor', Type.Moveable, None, 'Moveable name'],
492                 ['start_pos', Type.Float, None, 'Scan start position'],
493                 ['final_pos', Type.Float, None, 'Scan final position'],
494                 ['nr_interv', Type.Integer, None, 'Number of scan intervals'],
495                 ['integ_time', Type.Float, None, 'Integration time'],
496                 ['latency_time', Type.Float, 0, 'Latency time']]
497
498    def run(self, *args, **kwargs):
499        motor = args[0]
500        scan_macro = self.execMacro("ascanct", *args, **kwargs)
501        # we get the datahandler
502        # TODO: use public api to GScan object whenever
503        # https://gitlab.com/sardana-org/sardana/-/issues/784 gets solved
504        dh = scan_macro._gScan.data_handler
505        # calculate position corresponding to the middle space interval
506        positions = [r.data[motor.getName()] for r in scan_macro.data.records]
507        first_position = positions[0]
508        second_position = positions[1]
509        positive_direction = second_position > first_position
510        shift = abs(second_position - first_position) / 2
511        if positive_direction:
512            mid_positions = positions + shift
513        else:
514            mid_positions = positions - shift
515        # add custom data column to the measurement HDF5 group with the
516        # <motor_name>_mid name
517        dh.addCustomData(mid_positions, motor.getName() + '_mid',
518                         nxpath='measurement:NXcollection')