Macro scans examples
This chapter consists of a series of examples demonstrating specific features or tricks for programming custom scan macros using the Sardana scan framework.
1##############################################################################
2##
3# This file is part of Sardana
4##
5# http://www.sardana-controls.org/
6##
7# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
8##
9# Sardana is free software: you can redistribute it and/or modify
10# it under the terms of the GNU Lesser General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13##
14# Sardana is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU Lesser General Public License for more details.
18##
19# You should have received a copy of the GNU Lesser General Public License
20# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
21##
22##############################################################################
23
24"""
25 Macro library containning examples demonstrating specific features or tricks
26 for programming macros for Sardana.
27
28 Available Macros are:
29 ascanr
30 toothedtriangle
31
32"""
33
34__all__ = ["ascan_demo", "ascanr", "toothedtriangle",
35 "a2scan_mod"]
36
37__docformat__ = 'restructuredtext'
38
39import numpy
40
41from sardana.macroserver.macro import Macro, Hookable, Type
42from sardana.macroserver.scan import *
43
44
45class ascan_demo(Macro):
46 """
47 This is a basic reimplementation of the ascan` macro for demonstration
48 purposes of the Generic Scan framework. The "real" implementation of
49 :class:`sardana.macroserver.macros.ascan` derives from
50 :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
51 """
52
53 # this is used to indicate other codes that the macro is a scan
54 hints = {'scan': 'ascan_demo'}
55 # this hints that the macro requires the ActiveMntGrp environment variable
56 # to be set
57 env = ('ActiveMntGrp',)
58
59 param_def = [
60 ['motor', Type.Moveable, None, 'Motor to move'],
61 ['start_pos', Type.Float, None, 'Scan start position'],
62 ['final_pos', Type.Float, None, 'Scan final position'],
63 ['nr_interv', Type.Integer, None, 'Number of scan intervals'],
64 ['integ_time', Type.Float, None, 'Integration time']
65 ]
66
67 def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
68 # parse the user parameters
69 self.start = numpy.array([start_pos], dtype='d')
70 self.final = numpy.array([final_pos], dtype='d')
71 self.integ_time = integ_time
72
73 self.nb_points = nr_interv + 1
74 self.interv_size = (self.final - self.start) / nr_interv
75 self.name = 'ascan_demo'
76 # the "env" dictionary may be passed as an option
77 env = opts.get('env', {})
78
79 # create an instance of GScan (in this case, of its child, SScan
80 self._gScan = SScan(self, generator=self._generator,
81 moveables=[motor], env=env)
82
83 def _generator(self):
84 step = {}
85 # integ_time is the same for all steps
86 step["integ_time"] = self.integ_time
87 for point_no in range(self.nb_points):
88 step["positions"] = self.start + point_no * \
89 self.interv_size # note that this is a numpy array
90 step["point_id"] = point_no
91 yield step
92
93 def run(self, *args):
94 for step in self._gScan.step_scan(): # just go through the steps
95 yield step
96
97 @property
98 def data(self):
99 return self._gScan.data # the GScan provides scan data
100
101 def _get_nr_points(self):
102 msg = ("nr_points is deprecated since version 3.0.3. "
103 "Use nb_points instead.")
104 self.warning(msg)
105 return self.nb_points
106
107 nr_points = property(_get_nr_points)
108
109
110class ascanr(Macro, Hookable):
111 """This is an example of how to handle adding extra info columns in a scan.
112 Does the same than ascan but repeats the acquisitions "repeat" times for each step.
113 It could be implemented deriving from aNscan, but I do it like this for clarity.
114 Look for the comments with "!!!" for tips specific to the extra info columns
115 I do not support constrains in this one for simplicity (see ascan for that)
116
117 Do an absolute scan of the specified motor, repeating measurements in each step.
118 ascanr scans one motor, as specified by motor. The motor starts at the
119 position given by start_pos and ends at the position given by final_pos.
120 At each step, the acquisition will be repeated "repeat" times
121 The step size is (start_pos-final_pos)/nr_interv. The number of data points collected
122 will be (nr_interv+1)*repeat. Count time for each acquisition is given by time which if positive,
123 specifies seconds and if negative, specifies monitor counts. """
124
125 hints = {'scan': 'ascanr', 'allowsHooks': (
126 'pre-move', 'post-move', 'pre-acq', 'post-acq', 'post-step')}
127 env = ('ActiveMntGrp',)
128
129 param_def = [
130 ['motor', Type.Moveable, None, 'Motor to move'],
131 ['start_pos', Type.Float, None, 'Scan start position'],
132 ['final_pos', Type.Float, None, 'Scan final position'],
133 ['nr_interv', Type.Integer, None, 'Number of scan intervals'],
134 ['integ_time', Type.Float, None, 'Integration time'],
135 ['repeat', Type.Integer, None, 'Number of Repetitions']
136 ]
137
138 def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time, repeat,
139 **opts):
140
141 self.starts = numpy.array([start_pos], dtype='d')
142 self.finals = numpy.array([final_pos], dtype='d')
143 self.nr_interv = nr_interv
144 self.integ_time = integ_time
145 self.repeat = repeat
146 self.opts = opts
147
148 self.nb_points = (nr_interv + 1) * repeat
149 self.interv_sizes = (self.finals - self.starts) / nr_interv
150 self.name = 'ascanr'
151
152 generator = self._generator
153 moveables = [motor]
154 env = opts.get('env', {})
155 constrains = []
156 extrainfodesc = [ColumnDesc(name='repetition',
157 dtype='int64', shape=())] # !!!
158
159 self._gScan = SScan(self, generator, moveables, env,
160 constrains, extrainfodesc) # !!!
161
162 def _generator(self):
163 step = {}
164 step["integ_time"] = self.integ_time
165 step["pre-move-hooks"] = self.getHooks('pre-move')
166 step["post-move-hooks"] = self.getHooks('post-move')
167 step["pre-acq-hooks"] = self.getHooks('pre-acq')
168 step["post-acq-hooks"] = self.getHooks('post-acq') + self.getHooks(
169 '_NOHINTS_')
170 step["post-step-hooks"] = self.getHooks('post-step')
171 step["check_func"] = []
172 for point_no in range(self.nb_points // self.repeat):
173 extrainfo = {"repetition": 0} # !!!
174 step['extrainfo'] = extrainfo # !!!
175 step["positions"] = self.starts + point_no * self.interv_sizes
176 step["point_id"] = point_no
177 yield step
178 for i in range(1, self.repeat):
179 extrainfo["repetition"] = i # !!!
180 step["positions"] = [None]
181 yield step
182
183 def run(self, *args):
184 for step in self._gScan.step_scan():
185 yield step
186
187 @property
188 def data(self):
189 return self._gScan.data
190
191 def _get_nr_points(self):
192 msg = ("nr_points is deprecated since version 3.0.3. "
193 "Use nb_points instead.")
194 self.warning(msg)
195 return self.nb_points
196
197 nr_points = property(_get_nr_points)
198
199
200class toothedtriangle(Macro, Hookable):
201 """toothedtriangle macro implemented with the gscan framework.
202 It performs nr_cycles cycles, each consisting of two stages: the first half
203 of the cycle it behaves like the ascan macro (from start_pos to stop_pos in
204 nr_interv+1 steps).For the second half of the cycle it steps back until
205 it undoes the first half and is ready for the next cycle.
206 At each step, nr_samples acquisitions are performed.
207 The total number of points in the scan is nr_interv*2*nr_cycles*nr_samples+1"""
208
209 hints = {'scan': 'toothedtriangle',
210 'allowsHooks': ('pre-scan', 'pre-move', 'post-move', 'pre-acq',
211 'post-acq', 'post-step', 'post-scan')
212 }
213 env = ('ActiveMntGrp',)
214
215 param_def = [
216 ['motor', Type.Moveable, None, 'Motor to move'],
217 ['start_pos', Type.Float, None, 'start position'],
218 ['final_pos', Type.Float, None, 'position after half cycle'],
219 ['nr_interv', Type.Integer, None, 'Number of intervals in half cycle'],
220 ['integ_time', Type.Float, None, 'Integration time'],
221 ['nr_cycles', Type.Integer, None, 'Number of cycles'],
222 ['nr_samples', Type.Integer, 1, 'Number of samples at each point']
223 ]
224
225 def prepare(self, motor, start_pos, final_pos, nr_interv, integ_time,
226 nr_cycles, nr_samples, **opts):
227
228 self.start_pos = start_pos
229 self.final_pos = final_pos
230 self.nr_interv = nr_interv
231 self.integ_time = integ_time
232 self.nr_cycles = nr_cycles
233 self.nr_samples = nr_samples
234 self.opts = opts
235 cycle_nb_points = self.nr_interv + 1 + (self.nr_interv + 1) - 2
236 self.nb_points = cycle_nb_points * nr_samples * nr_cycles + nr_samples
237
238 self.interv_size = (self.final_pos - self.start_pos) / nr_interv
239 self.name = 'toothedtriangle'
240
241 generator = self._generator
242 moveables = []
243 moveable = MoveableDesc(moveable=motor, is_reference=True,
244 min_value=min(start_pos, final_pos),
245 max_value=max(start_pos, final_pos))
246 moveables = [moveable]
247 env = opts.get('env', {})
248 constrains = []
249 extrainfodesc = [ColumnDesc(name='cycle', dtype='int64', shape=()),
250 ColumnDesc(name='interval',
251 dtype='int64', shape=()),
252 ColumnDesc(name='sample', dtype='int64', shape=())] # !!!
253
254 self._gScan = SScan(self, generator, moveables, env,
255 constrains, extrainfodesc) # !!!
256
257 def _generator(self):
258 step = {}
259 step["integ_time"] = self.integ_time
260 step["pre-move-hooks"] = self.getHooks('pre-move')
261 step["post-move-hooks"] = self.getHooks('post-move')
262 step["pre-acq-hooks"] = self.getHooks('pre-acq')
263 step[
264 "post-acq-hooks"] = self.getHooks('post-acq') + self.getHooks('_NOHINT_')
265 step["post-step-hooks"] = self.getHooks('post-step')
266 step["check_func"] = []
267 extrainfo = {"cycle": None, "interval": None, "sample": None, }
268 step['extrainfo'] = extrainfo
269 halfcycle1 = list(range(self.nr_interv + 1))
270 halfcycle2 = halfcycle1[1:-1]
271 halfcycle2.reverse()
272 intervallist = halfcycle1 + halfcycle2
273 point_no = 0
274 for cycle in range(self.nr_cycles):
275 extrainfo["cycle"] = cycle
276 for interval in intervallist:
277 extrainfo["interval"] = interval
278 step["positions"] = numpy.array(
279 [self.start_pos + (interval) * self.interv_size], dtype='d')
280 for sample in range(self.nr_samples):
281 extrainfo["sample"] = sample
282 step["point_id"] = point_no
283 yield step
284 point_no += 1
285
286 # last step for closing the loop
287 extrainfo["interval"] = 0
288 step["positions"] = numpy.array([self.start_pos], dtype='d')
289 for sample in range(self.nr_samples):
290 extrainfo["sample"] = sample
291 step["point_id"] = point_no
292 yield step
293 point_no += 1
294
295 def run(self, *args):
296 for step in self._gScan.step_scan():
297 yield step
298
299 @property
300 def data(self):
301 return self._gScan.data
302
303 def _get_nr_points(self):
304 msg = ("nr_points is deprecated since version 3.0.3. "
305 "Use nb_points instead.")
306 self.warning(msg)
307 return self.nb_points
308
309 nr_points = property(_get_nr_points)
310
311
312class a2scan_mod(Macro):
313 """a2scan_mod.
314 Do an a2scan with the particularity of different intervals per motor: int_mot1, int_mot2.
315 If int_mot2 < int_mot1, mot2 will change position every int(int_mot1/int_mot2) steps of mot1.
316 It uses the gscan framework.
317 """
318
319 hints = {'scan': 'a2scan_mod'}
320 env = ('ActiveMntGrp',)
321
322 param_def = [
323 ['motor1', Type.Moveable, None, 'Motor 1 to move'],
324 ['start_pos1', Type.Float, None, 'Scan start position 1'],
325 ['final_pos1', Type.Float, None, 'Scan final position 1'],
326 ['nr_interv1', Type.Integer, None, 'Number of scan intervals of Motor 1'],
327 ['motor2', Type.Moveable, None, 'Motor 2 to move'],
328 ['start_pos2', Type.Float, None, 'Scan start position 2'],
329 ['final_pos2', Type.Float, None, 'Scan final position 2'],
330 ['nr_interv2', Type.Integer, None, 'Number of scan intervals of Motor 2'],
331 ['integ_time', Type.Float, None, 'Integration time']
332 ]
333
334 def prepare(self, motor1, start_pos1, final_pos1, nr_interv1, motor2, start_pos2, final_pos2, nr_interv2, integ_time,
335 **opts):
336 self.name = 'a2scan_mod'
337 self.integ_time = integ_time
338 self.start_pos1 = start_pos1
339 self.final_pos1 = final_pos1
340 self.nr_interv1 = nr_interv1
341 self.start_pos2 = start_pos2
342 self.final_pos2 = final_pos2
343 self.nr_interv2 = nr_interv2
344
345 generator = self._generator
346 moveables = [motor1, motor2]
347 env = opts.get('env', {})
348 constraints = []
349 self._gScan = SScan(self, generator, moveables, env, constraints)
350
351 def _generator(self):
352 step = {}
353 step["integ_time"] = self.integ_time
354
355 start1, end1, interv1 = self.start_pos1, self.final_pos1, self.nr_interv1
356 start2, end2, interv2 = self.start_pos2, self.final_pos2, self.nr_interv2
357
358 # Prepare the positions
359 positions_m1 = numpy.linspace(start1, end1, interv1 + 1)
360 positions_m2 = numpy.linspace(start2, end2, interv2 + 1)
361
362 if interv1 > interv2:
363 positions_m2 = start2 + ((end2 - start2) / interv2) * (
364 numpy.arange(interv1 + 1) // (interv1 / interv2))
365 elif interv2 > interv1:
366 positions_m1 = start1 + ((end1 - start1) / interv1) * (
367 numpy.arange(interv2 + 1) // (interv2 / interv1))
368
369 point_id = 0
370 for pos1, pos2 in zip(positions_m1, positions_m2):
371 step['point_id'] = point_id
372 step['positions'] = [pos1, pos2]
373 yield step
374 point_id += 1
375
376 def run(self, *args):
377 for step in self._gScan.step_scan():
378 yield step
379
380
381class ascanc_demo(Macro):
382 """
383 This is a basic reimplementation of the ascanc` macro for demonstration
384 purposes of the Generic Scan framework. The "real" implementation of
385 :class:`sardana.macroserver.macros.ascanc` derives from
386 :class:`sardana.macroserver.macros.aNscan` and provides some extra features.
387 """
388
389 # this is used to indicate other codes that the macro is a scan
390 hints = {'scan': 'ascanc_demo'}
391 # this hints that the macro requires the ActiveMntGrp environment variable
392 # to be set
393 env = ('ActiveMntGrp',)
394
395 param_def = [
396 ['motor', Type.Moveable, None, 'Motor to move'],
397 ['start_pos', Type.Float, None, 'Scan start position'],
398 ['final_pos', Type.Float, None, 'Scan final position'],
399 ['integ_time', Type.Float, None, 'Integration time']
400 ]
401
402 def prepare(self, motor, start_pos, final_pos, integ_time, **opts):
403 self.name = 'ascanc_demo'
404 # parse the user parameters
405 self.start = numpy.array([start_pos], dtype='d')
406 self.final = numpy.array([final_pos], dtype='d')
407 self.integ_time = integ_time
408 # the "env" dictionary may be passed as an option
409 env = opts.get('env', {})
410
411 # create an instance of GScan (in this case, of its child, CScan
412 self._gScan = CScan(self,
413 waypointGenerator=self._waypoint_generator,
414 periodGenerator=self._period_generator,
415 moveables=[motor],
416 env=env)
417
418 def _waypoint_generator(self):
419 # a very simple waypoint generator! only start and stop points!
420 yield {"positions": self.start, "waypoint_id": 0}
421 yield {"positions": self.final, "waypoint_id": 1}
422
423 def _period_generator(self):
424 step = {}
425 step["integ_time"] = self.integ_time
426 point_no = 0
427 while(True): # infinite generator. The acquisition loop is started/stopped at begin and end of each waypoint
428 point_no += 1
429 step["point_id"] = point_no
430 yield step
431
432 def run(self, *args):
433 for step in self._gScan.step_scan():
434 yield step
435
436
437class ascan_with_addcustomdata(ascan_demo):
438 '''
439 example of an ascan-like macro where we demonstrate how to pass custom data to the data handler.
440 This is an extension of the ascan_demo macro. Wemake several calls to `:meth:DataHandler.addCustomData`
441 exemplifying different features.
442 At least the following recorders will act on custom data:
443 - OutputRecorder (this will ignore array data)
444 - NXscan_FileRecorder
445 - SPEC_FileRecorder (this will ignore array data)
446 '''
447
448 def run(self, motor, start_pos, final_pos, nr_interv, integ_time, **opts):
449 # we get the datahandler
450 dh = self._gScan._data_handler
451 # at this point the entry name is not yet set, so we give it explicitly
452 # (otherwise it would default to "entry")
453 dh.addCustomData('Hello world1', 'dummyChar1',
454 nxpath='/custom_entry:NXentry/customdata:NXcollection')
455 # this is the normal scan loop
456 for step in self._gScan.step_scan():
457 yield step
458 # the entry number is known and the default nxpath is used
459 # "/<currententry>/custom_data") if none given
460 dh.addCustomData('Hello world1', 'dummyChar1')
461 # you can pass arrays (but not all recorders will handle them)
462 dh.addCustomData(list(range(10)), 'dummyArray1')
463 # you can pass a custom nxpath *relative* to the current entry
464 dh.addCustomData('Hello world2', 'dummyChar2',
465 nxpath='sample:NXsample')
466
467 # calculate a linear fit to the timestamps VS motor positions and store
468 # it
469 x = [r.data[motor.getName()] for r in self.data.records]
470 y = [r.data['timestamp'] for r in self.data.records]
471 fitted_y = numpy.polyval(numpy.polyfit(x, y, 1), x)
472 dh.addCustomData(fitted_y, 'fittedtime',
473 nxpath='measurement:NXcollection')
474
475 # as a bonus, plot the fit
476 self.pyplot.plot(x, y, 'ro')
477 self.pyplot.plot(x, fitted_y, 'b-')
478 self.pyplot.draw()
479
480
481class ascanct_midtrigger(Macro):
482 """This macro demonstrates how to add an extra scan column with
483 the shifted positions of the motor corresponding to the middle of the
484 space interval. Be aware that the space interval does not
485 necessarily correspond to the middle of the acquisition interval
486 (remember about the latency time).
487
488 This macro does not export all the ascanct features e.g. hooks.
489 """
490
491 param_def = [['motor', Type.Moveable, None, 'Moveable name'],
492 ['start_pos', Type.Float, None, 'Scan start position'],
493 ['final_pos', Type.Float, None, 'Scan final position'],
494 ['nr_interv', Type.Integer, None, 'Number of scan intervals'],
495 ['integ_time', Type.Float, None, 'Integration time'],
496 ['latency_time', Type.Float, 0, 'Latency time']]
497
498 def run(self, *args, **kwargs):
499 motor = args[0]
500 scan_macro = self.execMacro("ascanct", *args, **kwargs)
501 # we get the datahandler
502 # TODO: use public api to GScan object whenever
503 # https://gitlab.com/sardana-org/sardana/-/issues/784 gets solved
504 dh = scan_macro._gScan.data_handler
505 # calculate position corresponding to the middle space interval
506 positions = [r.data[motor.getName()] for r in scan_macro.data.records]
507 first_position = positions[0]
508 second_position = positions[1]
509 positive_direction = second_position > first_position
510 shift = abs(second_position - first_position) / 2
511 if positive_direction:
512 mid_positions = positions + shift
513 else:
514 mid_positions = positions - shift
515 # add custom data column to the measurement HDF5 group with the
516 # <motor_name>_mid name
517 dh.addCustomData(mid_positions, motor.getName() + '_mid',
518 nxpath='measurement:NXcollection')