Macro scans examples
This chapter consists of a series of examples demonstrating specific features or tricks for programming custom scan macros using the Sardana scan framework.
1##############################################################################
2##
3# This file is part of Sardana
4##
5# http://www.sardana-controls.org/
6##
7# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
8##
9# Sardana is free software: you can redistribute it and/or modify
10# it under the terms of the GNU Lesser General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13##
14# Sardana is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU Lesser General Public License for more details.
18##
19# You should have received a copy of the GNU Lesser General Public License
20# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
21##
22##############################################################################
23
24"""
25 Macro library containning examples demonstrating specific features or tricks
26 for programming macros for Sardana.
27
28Available Macros are:
29 ascanr
30 toothedtriangle
31
32"""
33
34__all__ = ["ascan_demo", "ascanr", "toothedtriangle", "a2scan_mod"]
35
36__docformat__ = "restructuredtext"
37
38import numpy
39
40from sardana.macroserver.macro import Hookable, Macro, Type
41from sardana.macroserver.scan import ColumnDesc, CScan, MoveableDesc, SScan
42
43
44class ascan_demo(Macro):
45 """
46 This is a basic reimplementation of the ascan` macro for demonstration
47 purposes of the Generic Scan framework. The "real" implementation of
48 :class:`sardana.macroserver.macros.ascan` derives from
49 :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
50 """
51
52 # this is used to indicate other codes that the macro is a scan
53 hints = {"scan": "ascan_demo"}
54 # this hints that the macro requires the ActiveMntGrp environment variable
55 # to be set
56 env = ("ActiveMntGrp",)
57
58 param_def = [
59 ["motor", Type.Moveable, None, "Motor to move"],
60 ["start_pos", Type.Float, None, "Scan start position"],
61 ["final_pos", Type.Float, None, "Scan final position"],
62 ["nr_interv", Type.Integer, None, "Number of scan intervals"],
63 ["integ_time", Type.Float, None, "Integration time"],
64 ]
65
66 def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
67 # parse the user parameters
68 self.start = numpy.array([start_pos], dtype="d")
69 self.final = numpy.array([final_pos], dtype="d")
70 self.integ_time = integ_time
71
72 self.nb_points = nr_interv + 1
73 self.interv_size = (self.final - self.start) / nr_interv
74 self.name = "ascan_demo"
75 # the "env" dictionary may be passed as an option
76 env = opts.get("env", {})
77
78 # create an instance of GScan (in this case, of its child, SScan
79 self._gScan = SScan(self, generator=self._generator, moveables=[motor], env=env)
80
81 def _generator(self):
82 step = {}
83 # integ_time is the same for all steps
84 step["integ_time"] = self.integ_time
85 for point_no in range(self.nb_points):
86 step["positions"] = (
87 self.start + point_no * self.interv_size
88 ) # note that this is a numpy array
89 step["point_id"] = point_no
90 yield step
91
92 def run(self, *args):
93 for step in self._gScan.step_scan(): # just go through the steps
94 yield step
95
96 @property
97 def data(self):
98 return self._gScan.data # the GScan provides scan data
99
100 def _get_nr_points(self):
101 msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
102 self.warning(msg)
103 return self.nb_points
104
105 nr_points = property(_get_nr_points)
106
107
108class ascanr(Macro, Hookable):
109 """This is an example of how to handle adding extra info columns in a scan.
110 Does the same than ascan but repeats the acquisitions "repeat" times for each step.
111 It could be implemented deriving from aNscan, but I do it like this for clarity.
112 Look for the comments with "!!!" for tips specific to the extra info columns
113 I do not support constrains in this one for simplicity (see ascan for that)
114
115 Do an absolute scan of the specified motor, repeating measurements in each step.
116 ascanr scans one motor, as specified by motor. The motor starts at the
117 position given by start_pos and ends at the position given by final_pos.
118 At each step, the acquisition will be repeated "repeat" times
119 The step size is (start_pos-final_pos)/nr_interv. The number of data points collected
120 will be (nr_interv+1)*repeat. Count time for each acquisition is given by time which if positive,
121 specifies seconds and if negative, specifies monitor counts."""
122
123 hints = {
124 "scan": "ascanr",
125 "allowsHooks": ("pre-move", "post-move", "pre-acq", "post-acq", "post-step"),
126 }
127 env = ("ActiveMntGrp",)
128
129 param_def = [
130 ["motor", Type.Moveable, None, "Motor to move"],
131 ["start_pos", Type.Float, None, "Scan start position"],
132 ["final_pos", Type.Float, None, "Scan final position"],
133 ["nr_interv", Type.Integer, None, "Number of scan intervals"],
134 ["integ_time", Type.Float, None, "Integration time"],
135 ["repeat", Type.Integer, None, "Number of Repetitions"],
136 ]
137
138 def prepare(
139 self, motor, start_pos, final_pos, nr_interv, integ_time, repeat, **opts
140 ):
141 self.starts = numpy.array([start_pos], dtype="d")
142 self.finals = numpy.array([final_pos], dtype="d")
143 self.nr_interv = nr_interv
144 self.integ_time = integ_time
145 self.repeat = repeat
146 self.opts = opts
147
148 self.nb_points = (nr_interv + 1) * repeat
149 self.interv_sizes = (self.finals - self.starts) / nr_interv
150 self.name = "ascanr"
151
152 generator = self._generator
153 moveables = [motor]
154 env = opts.get("env", {})
155 constrains = []
156 extrainfodesc = [ColumnDesc(name="repetition", dtype="int64", shape=())] # !!!
157
158 self._gScan = SScan(
159 self, generator, moveables, env, constrains, extrainfodesc
160 ) # !!!
161
162 def _generator(self):
163 step = {}
164 step["integ_time"] = self.integ_time
165 step["pre-move-hooks"] = self.getHooks("pre-move")
166 step["post-move-hooks"] = self.getHooks("post-move")
167 step["pre-acq-hooks"] = self.getHooks("pre-acq")
168 step["post-acq-hooks"] = self.getHooks("post-acq") + self.getHooks("_NOHINTS_")
169 step["post-step-hooks"] = self.getHooks("post-step")
170 step["check_func"] = []
171 for point_no in range(self.nb_points // self.repeat):
172 extrainfo = {"repetition": 0} # !!!
173 step["extrainfo"] = extrainfo # !!!
174 step["positions"] = self.starts + point_no * self.interv_sizes
175 step["point_id"] = point_no
176 yield step
177 for i in range(1, self.repeat):
178 extrainfo["repetition"] = i # !!!
179 step["positions"] = [None]
180 yield step
181
182 def run(self, *args):
183 for step in self._gScan.step_scan():
184 yield step
185
186 @property
187 def data(self):
188 return self._gScan.data
189
190 def _get_nr_points(self):
191 msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
192 self.warning(msg)
193 return self.nb_points
194
195 nr_points = property(_get_nr_points)
196
197
198class toothedtriangle(Macro, Hookable):
199 """toothedtriangle macro implemented with the gscan framework.
200 It performs nr_cycles cycles, each consisting of two stages: the first half
201 of the cycle it behaves like the ascan macro (from start_pos to stop_pos in
202 nr_interv+1 steps).For the second half of the cycle it steps back until
203 it undoes the first half and is ready for the next cycle.
204 At each step, nr_samples acquisitions are performed.
205 The total number of points in the scan is nr_interv*2*nr_cycles*nr_samples+1"""
206
207 hints = {
208 "scan": "toothedtriangle",
209 "allowsHooks": (
210 "pre-scan",
211 "pre-move",
212 "post-move",
213 "pre-acq",
214 "post-acq",
215 "post-step",
216 "post-scan",
217 ),
218 }
219 env = ("ActiveMntGrp",)
220
221 param_def = [
222 ["motor", Type.Moveable, None, "Motor to move"],
223 ["start_pos", Type.Float, None, "start position"],
224 ["final_pos", Type.Float, None, "position after half cycle"],
225 ["nr_interv", Type.Integer, None, "Number of intervals in half cycle"],
226 ["integ_time", Type.Float, None, "Integration time"],
227 ["nr_cycles", Type.Integer, None, "Number of cycles"],
228 ["nr_samples", Type.Integer, 1, "Number of samples at each point"],
229 ]
230
231 def prepare(
232 self,
233 motor,
234 start_pos,
235 final_pos,
236 nr_interv,
237 integ_time,
238 nr_cycles,
239 nr_samples,
240 **opts,
241 ):
242 self.start_pos = start_pos
243 self.final_pos = final_pos
244 self.nr_interv = nr_interv
245 self.integ_time = integ_time
246 self.nr_cycles = nr_cycles
247 self.nr_samples = nr_samples
248 self.opts = opts
249 cycle_nb_points = self.nr_interv + 1 + (self.nr_interv + 1) - 2
250 self.nb_points = cycle_nb_points * nr_samples * nr_cycles + nr_samples
251
252 self.interv_size = (self.final_pos - self.start_pos) / nr_interv
253 self.name = "toothedtriangle"
254
255 generator = self._generator
256 moveables = []
257 moveable = MoveableDesc(
258 moveable=motor,
259 is_reference=True,
260 min_value=min(start_pos, final_pos),
261 max_value=max(start_pos, final_pos),
262 )
263 moveables = [moveable]
264 env = opts.get("env", {})
265 constrains = []
266 extrainfodesc = [
267 ColumnDesc(name="cycle", dtype="int64", shape=()),
268 ColumnDesc(name="interval", dtype="int64", shape=()),
269 ColumnDesc(name="sample", dtype="int64", shape=()),
270 ] # !!!
271
272 self._gScan = SScan(
273 self, generator, moveables, env, constrains, extrainfodesc
274 ) # !!!
275
276 def _generator(self):
277 step = {}
278 step["integ_time"] = self.integ_time
279 step["pre-move-hooks"] = self.getHooks("pre-move")
280 step["post-move-hooks"] = self.getHooks("post-move")
281 step["pre-acq-hooks"] = self.getHooks("pre-acq")
282 step["post-acq-hooks"] = self.getHooks("post-acq") + self.getHooks("_NOHINT_")
283 step["post-step-hooks"] = self.getHooks("post-step")
284 step["check_func"] = []
285 extrainfo = {
286 "cycle": None,
287 "interval": None,
288 "sample": None,
289 }
290 step["extrainfo"] = extrainfo
291 halfcycle1 = list(range(self.nr_interv + 1))
292 halfcycle2 = halfcycle1[1:-1]
293 halfcycle2.reverse()
294 intervallist = halfcycle1 + halfcycle2
295 point_no = 0
296 for cycle in range(self.nr_cycles):
297 extrainfo["cycle"] = cycle
298 for interval in intervallist:
299 extrainfo["interval"] = interval
300 step["positions"] = numpy.array(
301 [self.start_pos + (interval) * self.interv_size], dtype="d"
302 )
303 for sample in range(self.nr_samples):
304 extrainfo["sample"] = sample
305 step["point_id"] = point_no
306 yield step
307 point_no += 1
308
309 # last step for closing the loop
310 extrainfo["interval"] = 0
311 step["positions"] = numpy.array([self.start_pos], dtype="d")
312 for sample in range(self.nr_samples):
313 extrainfo["sample"] = sample
314 step["point_id"] = point_no
315 yield step
316 point_no += 1
317
318 def run(self, *args):
319 for step in self._gScan.step_scan():
320 yield step
321
322 @property
323 def data(self):
324 return self._gScan.data
325
326 def _get_nr_points(self):
327 msg = "nr_points is deprecated since version 3.0.3. Use nb_points instead."
328 self.warning(msg)
329 return self.nb_points
330
331 nr_points = property(_get_nr_points)
332
333
334class a2scan_mod(Macro):
335 """a2scan_mod.
336 Do an a2scan with the particularity of different intervals per motor: int_mot1, int_mot2.
337 If int_mot2 < int_mot1, mot2 will change position every int(int_mot1/int_mot2) steps of mot1.
338 It uses the gscan framework.
339 """
340
341 hints = {"scan": "a2scan_mod"}
342 env = ("ActiveMntGrp",)
343
344 param_def = [
345 ["motor1", Type.Moveable, None, "Motor 1 to move"],
346 ["start_pos1", Type.Float, None, "Scan start position 1"],
347 ["final_pos1", Type.Float, None, "Scan final position 1"],
348 ["nr_interv1", Type.Integer, None, "Number of scan intervals of Motor 1"],
349 ["motor2", Type.Moveable, None, "Motor 2 to move"],
350 ["start_pos2", Type.Float, None, "Scan start position 2"],
351 ["final_pos2", Type.Float, None, "Scan final position 2"],
352 ["nr_interv2", Type.Integer, None, "Number of scan intervals of Motor 2"],
353 ["integ_time", Type.Float, None, "Integration time"],
354 ]
355
356 def prepare(
357 self,
358 motor1,
359 start_pos1,
360 final_pos1,
361 nr_interv1,
362 motor2,
363 start_pos2,
364 final_pos2,
365 nr_interv2,
366 integ_time,
367 **opts,
368 ):
369 self.name = "a2scan_mod"
370 self.integ_time = integ_time
371 self.start_pos1 = start_pos1
372 self.final_pos1 = final_pos1
373 self.nr_interv1 = nr_interv1
374 self.start_pos2 = start_pos2
375 self.final_pos2 = final_pos2
376 self.nr_interv2 = nr_interv2
377
378 generator = self._generator
379 moveables = [motor1, motor2]
380 env = opts.get("env", {})
381 constraints = []
382 self._gScan = SScan(self, generator, moveables, env, constraints)
383
384 def _generator(self):
385 step = {}
386 step["integ_time"] = self.integ_time
387
388 start1, end1, interv1 = self.start_pos1, self.final_pos1, self.nr_interv1
389 start2, end2, interv2 = self.start_pos2, self.final_pos2, self.nr_interv2
390
391 # Prepare the positions
392 positions_m1 = numpy.linspace(start1, end1, interv1 + 1)
393 positions_m2 = numpy.linspace(start2, end2, interv2 + 1)
394
395 if interv1 > interv2:
396 positions_m2 = start2 + ((end2 - start2) / interv2) * (
397 numpy.arange(interv1 + 1) // (interv1 / interv2)
398 )
399 elif interv2 > interv1:
400 positions_m1 = start1 + ((end1 - start1) / interv1) * (
401 numpy.arange(interv2 + 1) // (interv2 / interv1)
402 )
403
404 point_id = 0
405 for pos1, pos2 in zip(positions_m1, positions_m2):
406 step["point_id"] = point_id
407 step["positions"] = [pos1, pos2]
408 yield step
409 point_id += 1
410
411 def run(self, *args):
412 for step in self._gScan.step_scan():
413 yield step
414
415
416class ascanc_demo(Macro):
417 """
418 This is a basic reimplementation of the ascanc` macro for demonstration
419 purposes of the Generic Scan framework. The "real" implementation of
420 :class:`sardana.macroserver.macros.ascanc` derives from
421 :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
422 """
423
424 # this is used to indicate other codes that the macro is a scan
425 hints = {"scan": "ascanc_demo"}
426 # this hints that the macro requires the ActiveMntGrp environment variable
427 # to be set
428 env = ("ActiveMntGrp",)
429
430 param_def = [
431 ["motor", Type.Moveable, None, "Motor to move"],
432 ["start_pos", Type.Float, None, "Scan start position"],
433 ["final_pos", Type.Float, None, "Scan final position"],
434 ["integ_time", Type.Float, None, "Integration time"],
435 ]
436
437 def prepare(self, motor, start_pos, final_pos, integ_time, **opts):
438 self.name = "ascanc_demo"
439 # parse the user parameters
440 self.start = numpy.array([start_pos], dtype="d")
441 self.final = numpy.array([final_pos], dtype="d")
442 self.integ_time = integ_time
443 # the "env" dictionary may be passed as an option
444 env = opts.get("env", {})
445
446 # create an instance of GScan (in this case, of its child, CScan
447 self._gScan = CScan(
448 self,
449 waypointGenerator=self._waypoint_generator,
450 periodGenerator=self._period_generator,
451 moveables=[motor],
452 env=env,
453 )
454
455 def _waypoint_generator(self):
456 # a very simple waypoint generator! only start and stop points!
457 yield {"positions": self.start, "waypoint_id": 0}
458 yield {"positions": self.final, "waypoint_id": 1}
459
460 def _period_generator(self):
461 step = {}
462 step["integ_time"] = self.integ_time
463 point_no = 0
464 while True: # infinite generator. The acquisition loop is started/stopped at begin and end of each waypoint
465 point_no += 1
466 step["point_id"] = point_no
467 yield step
468
469 def run(self, *args):
470 for step in self._gScan.step_scan():
471 yield step
472
473
474class ascan_with_addcustomdata(ascan_demo):
475 """
476 example of an ascan-like macro where we demonstrate how to pass custom data to the data handler.
477 This is an extension of the ascan_demo macro. Wemake several calls to `:meth:DataHandler.addCustomData`
478 exemplifying different features.
479 At least the following recorders will act on custom data:
480 - OutputRecorder (this will ignore array data)
481 - NXscan_FileRecorder
482 - SPEC_FileRecorder (this will ignore array data)
483 """
484
485 def run(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
486 # we get the datahandler
487 dh = self._gScan._data_handler
488 # at this point the entry name is not yet set, so we give it explicitly
489 # (otherwise it would default to "entry")
490 dh.addCustomData(
491 "Hello world1",
492 "dummyChar1",
493 nxpath="/custom_entry:NXentry/customdata:NXcollection",
494 )
495 # this is the normal scan loop
496 for step in self._gScan.step_scan():
497 yield step
498 # the entry number is known and the default nxpath is used
499 # "/<currententry>/custom_data") if none given
500 dh.addCustomData("Hello world1", "dummyChar1")
501 # you can pass arrays (but not all recorders will handle them)
502 dh.addCustomData(list(range(10)), "dummyArray1")
503 # you can pass a custom nxpath *relative* to the current entry
504 dh.addCustomData("Hello world2", "dummyChar2", nxpath="sample:NXsample")
505
506 # calculate a linear fit to the timestamps VS motor positions and store
507 # it
508 x = [r.data[motor.getName()] for r in self.data.records]
509 y = [r.data["timestamp"] for r in self.data.records]
510 fitted_y = numpy.polyval(numpy.polyfit(x, y, 1), x)
511 dh.addCustomData(fitted_y, "fittedtime", nxpath="measurement:NXcollection")
512
513 # as a bonus, plot the fit
514 self.pyplot.plot(x, y, "ro")
515 self.pyplot.plot(x, fitted_y, "b-")
516 self.pyplot.draw()
517
518
519class ascanct_midtrigger(Macro):
520 """This macro demonstrates how to add an extra scan column with
521 the shifted positions of the motor corresponding to the middle of the
522 space interval. Be aware that the space interval does not
523 necessarily correspond to the middle of the acquisition interval
524 (remember about the latency time).
525
526 This macro does not export all the ascanct features e.g. hooks.
527 """
528
529 param_def = [
530 ["motor", Type.Moveable, None, "Moveable name"],
531 ["start_pos", Type.Float, None, "Scan start position"],
532 ["final_pos", Type.Float, None, "Scan final position"],
533 ["nr_interv", Type.Integer, None, "Number of scan intervals"],
534 ["integ_time", Type.Float, None, "Integration time"],
535 ["latency_time", Type.Float, 0, "Latency time"],
536 ]
537
538 def run(self, *args, **kwargs):
539 motor = args[0]
540 scan_macro = self.execMacro("ascanct", *args, **kwargs)
541 # we get the datahandler
542 # TODO: use public api to GScan object whenever
543 # https://gitlab.com/sardana-org/sardana/-/issues/784 gets solved
544 dh = scan_macro._gScan.data_handler
545 # calculate position corresponding to the middle space interval
546 positions = [r.data[motor.getName()] for r in scan_macro.data.records]
547 first_position = positions[0]
548 second_position = positions[1]
549 positive_direction = second_position > first_position
550 shift = abs(second_position - first_position) / 2
551 if positive_direction:
552 mid_positions = positions + shift
553 else:
554 mid_positions = positions - shift
555 # add custom data column to the measurement HDF5 group with the
556 # <motor_name>_mid name
557 dh.addCustomData(
558 mid_positions, motor.getName() + "_mid", nxpath="measurement:NXcollection"
559 )