Macro scans examples

This chapter consists of a series of examples demonstrating specific features or tricks for programming custom scan macros using the Sardana scan framework.

  1##############################################################################
  2##
  3# This file is part of Sardana
  4##
  5# http://www.sardana-controls.org/
  6##
  7# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
  8##
  9# Sardana is free software: you can redistribute it and/or modify
 10# it under the terms of the GNU Lesser General Public License as published by
 11# the Free Software Foundation, either version 3 of the License, or
 12# (at your option) any later version.
 13##
 14# Sardana is distributed in the hope that it will be useful,
 15# but WITHOUT ANY WARRANTY; without even the implied warranty of
 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 17# GNU Lesser General Public License for more details.
 18##
 19# You should have received a copy of the GNU Lesser General Public License
 20# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
 21##
 22##############################################################################
 23
 24"""
 25 Macro library containning examples demonstrating specific features or tricks
 26 for programming macros for Sardana.
 27
 28Available Macros are:
 29  ascanr
 30  toothedtriangle
 31
 32"""
 33
 34__all__ = ["ascan_demo", "ascanr", "toothedtriangle", "a2scan_mod"]
 35
 36__docformat__ = "restructuredtext"
 37
 38import numpy
 39
 40from sardana.macroserver.macro import Hookable, Macro, Type
 41from sardana.macroserver.scan import ColumnDesc, CScan, MoveableDesc, SScan
 42
 43
 44class ascan_demo(Macro):
 45    """
 46    This is a basic reimplementation of the ascan` macro for demonstration
 47    purposes of the Generic Scan framework. The "real" implementation of
 48    :class:`sardana.macroserver.macros.ascan` derives from
 49    :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
 50    """
 51
 52    # this is used to indicate other codes that the macro is a scan
 53    hints = {"scan": "ascan_demo"}
 54    # this hints that the macro requires the ActiveMntGrp environment variable
 55    # to be set
 56    env = ("ActiveMntGrp",)
 57
 58    param_def = [
 59        ["motor", Type.Moveable, None, "Motor to move"],
 60        ["start_pos", Type.Float, None, "Scan start position"],
 61        ["final_pos", Type.Float, None, "Scan final position"],
 62        ["nr_interv", Type.Integer, None, "Number of scan intervals"],
 63        ["integ_time", Type.Float, None, "Integration time"],
 64    ]
 65
 66    def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
 67        # parse the user parameters
 68        self.start = numpy.array([start_pos], dtype="d")
 69        self.final = numpy.array([final_pos], dtype="d")
 70        self.integ_time = integ_time
 71
 72        self.nb_points = nr_interv + 1
 73        self.interv_size = (self.final - self.start) / nr_interv
 74        self.name = "ascan_demo"
 75        # the "env" dictionary may be passed as an option
 76        env = opts.get("env", {})
 77
 78        # create an instance of GScan (in this case, of its child, SScan
 79        self._gScan = SScan(self, generator=self._generator, moveables=[motor], env=env)
 80
 81    def _generator(self):
 82        step = {}
 83        # integ_time is the same for all steps
 84        step["integ_time"] = self.integ_time
 85        for point_no in range(self.nb_points):
 86            step["positions"] = (
 87                self.start + point_no * self.interv_size
 88            )  # note that this is a numpy array
 89            step["point_id"] = point_no
 90            yield step
 91
 92    def run(self, *args):
 93        for step in self._gScan.step_scan():  # just go through the steps
 94            yield step
 95
 96    @property
 97    def data(self):
 98        return self._gScan.data  # the GScan provides scan data
 99
100    def _get_nr_points(self):
101        msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
102        self.warning(msg)
103        return self.nb_points
104
105    nr_points = property(_get_nr_points)
106
107
108class ascanr(Macro, Hookable):
109    """This is an example of how to handle adding extra info columns in a scan.
110    Does the same than ascan but repeats the acquisitions "repeat" times for each step.
111    It could be implemented deriving from aNscan, but I do it like this for clarity.
112    Look for the comments with "!!!" for tips specific to the extra info columns
113    I do not support constrains in this one for simplicity (see ascan for that)
114
115    Do an absolute scan of the specified motor, repeating measurements in each step.
116    ascanr scans one motor, as specified by motor. The motor starts at the
117    position given by start_pos and ends at the position given by final_pos.
118    At each step, the acquisition will be repeated "repeat" times
119    The step size is (start_pos-final_pos)/nr_interv. The number of data points collected
120    will be (nr_interv+1)*repeat. Count time for each acquisition is given by time which if positive,
121    specifies seconds and if negative, specifies monitor counts."""
122
123    hints = {
124        "scan": "ascanr",
125        "allowsHooks": ("pre-move", "post-move", "pre-acq", "post-acq", "post-step"),
126    }
127    env = ("ActiveMntGrp",)
128
129    param_def = [
130        ["motor", Type.Moveable, None, "Motor to move"],
131        ["start_pos", Type.Float, None, "Scan start position"],
132        ["final_pos", Type.Float, None, "Scan final position"],
133        ["nr_interv", Type.Integer, None, "Number of scan intervals"],
134        ["integ_time", Type.Float, None, "Integration time"],
135        ["repeat", Type.Integer, None, "Number of Repetitions"],
136    ]
137
138    def prepare(
139        self, motor, start_pos, final_pos, nr_interv, integ_time, repeat, **opts
140    ):
141        self.starts = numpy.array([start_pos], dtype="d")
142        self.finals = numpy.array([final_pos], dtype="d")
143        self.nr_interv = nr_interv
144        self.integ_time = integ_time
145        self.repeat = repeat
146        self.opts = opts
147
148        self.nb_points = (nr_interv + 1) * repeat
149        self.interv_sizes = (self.finals - self.starts) / nr_interv
150        self.name = "ascanr"
151
152        generator = self._generator
153        moveables = [motor]
154        env = opts.get("env", {})
155        constrains = []
156        extrainfodesc = [ColumnDesc(name="repetition", dtype="int64", shape=())]  # !!!
157
158        self._gScan = SScan(
159            self, generator, moveables, env, constrains, extrainfodesc
160        )  # !!!
161
162    def _generator(self):
163        step = {}
164        step["integ_time"] = self.integ_time
165        step["pre-move-hooks"] = self.getHooks("pre-move")
166        step["post-move-hooks"] = self.getHooks("post-move")
167        step["pre-acq-hooks"] = self.getHooks("pre-acq")
168        step["post-acq-hooks"] = self.getHooks("post-acq") + self.getHooks("_NOHINTS_")
169        step["post-step-hooks"] = self.getHooks("post-step")
170        step["check_func"] = []
171        for point_no in range(self.nb_points // self.repeat):
172            extrainfo = {"repetition": 0}  # !!!
173            step["extrainfo"] = extrainfo  # !!!
174            step["positions"] = self.starts + point_no * self.interv_sizes
175            step["point_id"] = point_no
176            yield step
177            for i in range(1, self.repeat):
178                extrainfo["repetition"] = i  # !!!
179                step["positions"] = [None]
180                yield step
181
182    def run(self, *args):
183        for step in self._gScan.step_scan():
184            yield step
185
186    @property
187    def data(self):
188        return self._gScan.data
189
190    def _get_nr_points(self):
191        msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
192        self.warning(msg)
193        return self.nb_points
194
195    nr_points = property(_get_nr_points)
196
197
198class toothedtriangle(Macro, Hookable):
199    """toothedtriangle macro implemented with the gscan framework.
200    It performs nr_cycles cycles, each consisting of two stages: the first half
201    of the cycle it behaves like the ascan macro (from start_pos to stop_pos in
202    nr_interv+1 steps).For the second half of the cycle it steps back until
203    it undoes the first half and is ready for the next cycle.
204    At each step, nr_samples acquisitions are performed.
205    The total number of points in the scan is nr_interv*2*nr_cycles*nr_samples+1"""
206
207    hints = {
208        "scan": "toothedtriangle",
209        "allowsHooks": (
210            "pre-scan",
211            "pre-move",
212            "post-move",
213            "pre-acq",
214            "post-acq",
215            "post-step",
216            "post-scan",
217        ),
218    }
219    env = ("ActiveMntGrp",)
220
221    param_def = [
222        ["motor", Type.Moveable, None, "Motor to move"],
223        ["start_pos", Type.Float, None, "start position"],
224        ["final_pos", Type.Float, None, "position after half cycle"],
225        ["nr_interv", Type.Integer, None, "Number of intervals in half cycle"],
226        ["integ_time", Type.Float, None, "Integration time"],
227        ["nr_cycles", Type.Integer, None, "Number of cycles"],
228        ["nr_samples", Type.Integer, 1, "Number of samples at each point"],
229    ]
230
231    def prepare(
232        self,
233        motor,
234        start_pos,
235        final_pos,
236        nr_interv,
237        integ_time,
238        nr_cycles,
239        nr_samples,
240        **opts,
241    ):
242        self.start_pos = start_pos
243        self.final_pos = final_pos
244        self.nr_interv = nr_interv
245        self.integ_time = integ_time
246        self.nr_cycles = nr_cycles
247        self.nr_samples = nr_samples
248        self.opts = opts
249        cycle_nb_points = self.nr_interv + 1 + (self.nr_interv + 1) - 2
250        self.nb_points = cycle_nb_points * nr_samples * nr_cycles + nr_samples
251
252        self.interv_size = (self.final_pos - self.start_pos) / nr_interv
253        self.name = "toothedtriangle"
254
255        generator = self._generator
256        moveables = []
257        moveable = MoveableDesc(
258            moveable=motor,
259            is_reference=True,
260            min_value=min(start_pos, final_pos),
261            max_value=max(start_pos, final_pos),
262        )
263        moveables = [moveable]
264        env = opts.get("env", {})
265        constrains = []
266        extrainfodesc = [
267            ColumnDesc(name="cycle", dtype="int64", shape=()),
268            ColumnDesc(name="interval", dtype="int64", shape=()),
269            ColumnDesc(name="sample", dtype="int64", shape=()),
270        ]  # !!!
271
272        self._gScan = SScan(
273            self, generator, moveables, env, constrains, extrainfodesc
274        )  # !!!
275
276    def _generator(self):
277        step = {}
278        step["integ_time"] = self.integ_time
279        step["pre-move-hooks"] = self.getHooks("pre-move")
280        step["post-move-hooks"] = self.getHooks("post-move")
281        step["pre-acq-hooks"] = self.getHooks("pre-acq")
282        step["post-acq-hooks"] = self.getHooks("post-acq") + self.getHooks("_NOHINT_")
283        step["post-step-hooks"] = self.getHooks("post-step")
284        step["check_func"] = []
285        extrainfo = {
286            "cycle": None,
287            "interval": None,
288            "sample": None,
289        }
290        step["extrainfo"] = extrainfo
291        halfcycle1 = list(range(self.nr_interv + 1))
292        halfcycle2 = halfcycle1[1:-1]
293        halfcycle2.reverse()
294        intervallist = halfcycle1 + halfcycle2
295        point_no = 0
296        for cycle in range(self.nr_cycles):
297            extrainfo["cycle"] = cycle
298            for interval in intervallist:
299                extrainfo["interval"] = interval
300                step["positions"] = numpy.array(
301                    [self.start_pos + (interval) * self.interv_size], dtype="d"
302                )
303                for sample in range(self.nr_samples):
304                    extrainfo["sample"] = sample
305                    step["point_id"] = point_no
306                    yield step
307                    point_no += 1
308
309        # last step for closing the loop
310        extrainfo["interval"] = 0
311        step["positions"] = numpy.array([self.start_pos], dtype="d")
312        for sample in range(self.nr_samples):
313            extrainfo["sample"] = sample
314            step["point_id"] = point_no
315            yield step
316            point_no += 1
317
318    def run(self, *args):
319        for step in self._gScan.step_scan():
320            yield step
321
322    @property
323    def data(self):
324        return self._gScan.data
325
326    def _get_nr_points(self):
327        msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
328        self.warning(msg)
329        return self.nb_points
330
331    nr_points = property(_get_nr_points)
332
333
334class a2scan_mod(Macro):
335    """a2scan_mod.
336    Do an a2scan with the particularity of different intervals per motor: int_mot1, int_mot2.
337    If int_mot2 < int_mot1, mot2 will change position every int(int_mot1/int_mot2) steps of mot1.
338    It uses the gscan framework.
339    """
340
341    hints = {"scan": "a2scan_mod"}
342    env = ("ActiveMntGrp",)
343
344    param_def = [
345        ["motor1", Type.Moveable, None, "Motor 1 to move"],
346        ["start_pos1", Type.Float, None, "Scan start position 1"],
347        ["final_pos1", Type.Float, None, "Scan final position 1"],
348        ["nr_interv1", Type.Integer, None, "Number of scan intervals of Motor 1"],
349        ["motor2", Type.Moveable, None, "Motor 2 to move"],
350        ["start_pos2", Type.Float, None, "Scan start position 2"],
351        ["final_pos2", Type.Float, None, "Scan final position 2"],
352        ["nr_interv2", Type.Integer, None, "Number of scan intervals of Motor 2"],
353        ["integ_time", Type.Float, None, "Integration time"],
354    ]
355
356    def prepare(
357        self,
358        motor1,
359        start_pos1,
360        final_pos1,
361        nr_interv1,
362        motor2,
363        start_pos2,
364        final_pos2,
365        nr_interv2,
366        integ_time,
367        **opts,
368    ):
369        self.name = "a2scan_mod"
370        self.integ_time = integ_time
371        self.start_pos1 = start_pos1
372        self.final_pos1 = final_pos1
373        self.nr_interv1 = nr_interv1
374        self.start_pos2 = start_pos2
375        self.final_pos2 = final_pos2
376        self.nr_interv2 = nr_interv2
377
378        generator = self._generator
379        moveables = [motor1, motor2]
380        env = opts.get("env", {})
381        constraints = []
382        self._gScan = SScan(self, generator, moveables, env, constraints)
383
384    def _generator(self):
385        step = {}
386        step["integ_time"] = self.integ_time
387
388        start1, end1, interv1 = self.start_pos1, self.final_pos1, self.nr_interv1
389        start2, end2, interv2 = self.start_pos2, self.final_pos2, self.nr_interv2
390
391        # Prepare the positions
392        positions_m1 = numpy.linspace(start1, end1, interv1 + 1)
393        positions_m2 = numpy.linspace(start2, end2, interv2 + 1)
394
395        if interv1 > interv2:
396            positions_m2 = start2 + ((end2 - start2) / interv2) * (
397                numpy.arange(interv1 + 1) // (interv1 / interv2)
398            )
399        elif interv2 > interv1:
400            positions_m1 = start1 + ((end1 - start1) / interv1) * (
401                numpy.arange(interv2 + 1) // (interv2 / interv1)
402            )
403
404        point_id = 0
405        for pos1, pos2 in zip(positions_m1, positions_m2):
406            step["point_id"] = point_id
407            step["positions"] = [pos1, pos2]
408            yield step
409            point_id += 1
410
411    def run(self, *args):
412        for step in self._gScan.step_scan():
413            yield step
414
415
416class ascanc_demo(Macro):
417    """
418    This is a basic reimplementation of the ascanc` macro for demonstration
419    purposes of the Generic Scan framework. The "real" implementation of
420    :class:`sardana.macroserver.macros.ascanc` derives from
421    :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
422    """
423
424    # this is used to indicate other codes that the macro is a scan
425    hints = {"scan": "ascanc_demo"}
426    # this hints that the macro requires the ActiveMntGrp environment variable
427    # to be set
428    env = ("ActiveMntGrp",)
429
430    param_def = [
431        ["motor", Type.Moveable, None, "Motor to move"],
432        ["start_pos", Type.Float, None, "Scan start position"],
433        ["final_pos", Type.Float, None, "Scan final position"],
434        ["integ_time", Type.Float, None, "Integration time"],
435    ]
436
437    def prepare(self, motor, start_pos, final_pos, integ_time, **opts):
438        self.name = "ascanc_demo"
439        # parse the user parameters
440        self.start = numpy.array([start_pos], dtype="d")
441        self.final = numpy.array([final_pos], dtype="d")
442        self.integ_time = integ_time
443        # the "env" dictionary may be passed as an option
444        env = opts.get("env", {})
445
446        # create an instance of GScan (in this case, of its child, CScan
447        self._gScan = CScan(
448            self,
449            waypointGenerator=self._waypoint_generator,
450            periodGenerator=self._period_generator,
451            moveables=[motor],
452            env=env,
453        )
454
455    def _waypoint_generator(self):
456        # a very simple waypoint generator! only start and stop points!
457        yield {"positions": self.start, "waypoint_id": 0}
458        yield {"positions": self.final, "waypoint_id": 1}
459
460    def _period_generator(self):
461        step = {}
462        step["integ_time"] = self.integ_time
463        point_no = 0
464        while True:  # infinite generator. The acquisition loop is started/stopped at begin and end of each waypoint
465            point_no += 1
466            step["point_id"] = point_no
467            yield step
468
469    def run(self, *args):
470        for step in self._gScan.step_scan():
471            yield step
472
473
474class ascan_with_addcustomdata(ascan_demo):
475    """
476    example of an ascan-like macro where we demonstrate how to pass custom data to the data handler.
477    This is an extension of the ascan_demo macro. Wemake several calls to `:meth:DataHandler.addCustomData`
478    exemplifying different features.
479    At least the following recorders will act on custom data:
480      - OutputRecorder (this will ignore array data)
481      - NXscan_FileRecorder
482      - SPEC_FileRecorder (this will ignore array data)
483    """
484
485    def run(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
486        # we get the datahandler
487        dh = self._gScan._data_handler
488        # at this point the entry name is not yet set, so we give it explicitly
489        # (otherwise it would default to "entry")
490        dh.addCustomData(
491            "Hello world1",
492            "dummyChar1",
493            nxpath="/custom_entry:NXentry/customdata:NXcollection",
494        )
495        # this is the normal scan loop
496        for step in self._gScan.step_scan():
497            yield step
498        # the entry number is known and the default nxpath is used
499        # "/<currententry>/custom_data") if none given
500        dh.addCustomData("Hello world1", "dummyChar1")
501        # you can pass arrays (but not all recorders will handle them)
502        dh.addCustomData(list(range(10)), "dummyArray1")
503        # you can pass a custom nxpath *relative* to the current entry
504        dh.addCustomData("Hello world2", "dummyChar2", nxpath="sample:NXsample")
505
506        # calculate a linear fit to the timestamps VS motor positions and store
507        # it
508        x = [r.data[motor.getName()] for r in self.data.records]
509        y = [r.data["timestamp"] for r in self.data.records]
510        fitted_y = numpy.polyval(numpy.polyfit(x, y, 1), x)
511        dh.addCustomData(fitted_y, "fittedtime", nxpath="measurement:NXcollection")
512
513        # as a bonus, plot the fit
514        self.pyplot.plot(x, y, "ro")
515        self.pyplot.plot(x, fitted_y, "b-")
516        self.pyplot.draw()
517
518
519class ascanct_midtrigger(Macro):
520    """This macro demonstrates how to add an extra scan column with
521    the shifted positions of the motor corresponding to the middle of the
522    space interval. Be aware that the space interval does not
523    necessarily correspond to the middle of the acquisition interval
524    (remember about the latency time).
525
526    This macro does not export all the ascanct features e.g. hooks.
527    """
528
529    param_def = [
530        ["motor", Type.Moveable, None, "Moveable name"],
531        ["start_pos", Type.Float, None, "Scan start position"],
532        ["final_pos", Type.Float, None, "Scan final position"],
533        ["nr_interv", Type.Integer, None, "Number of scan intervals"],
534        ["integ_time", Type.Float, None, "Integration time"],
535        ["latency_time", Type.Float, 0, "Latency time"],
536    ]
537
538    def run(self, *args, **kwargs):
539        motor = args[0]
540        scan_macro = self.execMacro("ascanct", *args, **kwargs)
541        # we get the datahandler
542        # TODO: use public api to GScan object whenever
543        # https://gitlab.com/sardana-org/sardana/-/issues/784 gets solved
544        dh = scan_macro._gScan.data_handler
545        # calculate position corresponding to the middle space interval
546        positions = [r.data[motor.getName()] for r in scan_macro.data.records]
547        first_position = positions[0]
548        second_position = positions[1]
549        positive_direction = second_position > first_position
550        shift = abs(second_position - first_position) / 2
551        if positive_direction:
552            mid_positions = positions + shift
553        else:
554            mid_positions = positions - shift
555        # add custom data column to the measurement HDF5 group with the
556        # <motor_name>_mid name
557        dh.addCustomData(
558            mid_positions, motor.getName() + "_mid", nxpath="measurement:NXcollection"
559        )