Ticket #47971: patch-DTT-src-awg-awgbase.py.in.diff

File patch-DTT-src-awg-awgbase.py.in.diff, 7.1 KB (added by emaros, 9 years ago)

Patch to introduce new file.

  • DTT/src/awg/awgbase.py.in

    old new  
     1# -*- mode: python -*-
     2# Version $Id$
     3"""
     4ctypes wrapping of AWG API
     5
     6Author: Christopher Wipf, Massachusetts Institute of Technology
     7 
     8This module calls the libawg library for the standard set of
     9waveforms, the libtestpoint library for testpoint handling, and the
     10libSIStr library (from awgstream) for arbitrary loops and streams.
     11
     12"""
     13import os
     14import platform
     15from ctypes import CDLL, Structure, c_int, c_uint, c_longlong, c_ulonglong, \
     16    c_float, c_double, c_char, c_char_p, POINTER
     17from numpy import array
     18from numpy.ctypeslib import ndpointer
     19
     20__docformat__ = 'restructuredtext'
     21
     22sys = platform.system( )
     23ext = '.so'
     24awglibdir = '__libdir__'
     25if ( len(awglibdir) > 0 ):
     26    awglibdir += os.sep
     27if ( sys == 'Darwin' ):
     28    ext = '.dylib'
     29libawg = CDLL(awglibdir + 'libawg' + ext)
     30libtestpoint = CDLL(awglibdir + 'libtestpoint' + ext)
     31libSIStr = None
     32try:
     33    libSIStr = CDLL(awglibdir + 'libSIStr' + ext)
     34except:
     35    pass
     36
     37####
     38#### tconv.h
     39####
     40
     41# One epoch expressed in ns
     42_EPOCH = 1e9 / 16.0
     43
     44libawg.TAInow.argtypes = []
     45libawg.TAInow.restype = c_ulonglong
     46TAInow = libawg.TAInow
     47GPSnow = TAInow
     48
     49
     50####
     51#### awgtype.h
     52####
     53
     54# AWG_WaveType enum values
     55awgNone = 0
     56awgSine = 1
     57awgSquare = 2
     58awgRamp = 3
     59awgTriangle = 4
     60awgImpulse = 5
     61awgConst = 6
     62awgNoiseN = 7
     63awgNoiseU = 8
     64awgArb = 9
     65awgStream = 10
     66
     67# Phasing types
     68AWG_PHASING_STEP = 0
     69AWG_PHASING_LINEAR = 1
     70AWG_PHASING_QUADRATIC = 2
     71AWG_PHASING_LOG = 3
     72
     73class AWG_Component(Structure):
     74    _fields_ = [("wtype", c_uint),
     75                ("_par_ctypes", c_double * 4),
     76                ("start", c_ulonglong),
     77                ("duration", c_ulonglong),
     78                ("restart", c_ulonglong),
     79                ("ramptype", c_int),
     80                ("_ramptime_ctypes", c_ulonglong * 2),
     81                ("_ramppar_ctypes", c_double * 4)]
     82    @property
     83    def par(self):
     84        return tuple(self._par_ctypes)
     85
     86    @par.setter
     87    def par(self, val):
     88        self._par_ctypes = (c_double * 4)(*val)
     89
     90    @property
     91    def ramptime(self):
     92        return tuple(self._ramptime_ctypes)
     93
     94    @ramptime.setter
     95    def ramptime(self, val):
     96        self._ramptime_ctypes = (c_ulonglong * 2)(*val)
     97
     98    @property
     99    def ramppar(self):
     100        return tuple(self._ramppar_ctypes)
     101
     102    @ramppar.setter
     103    def ramppar(self, val):
     104        self._ramppar_ctypes = (c_double * 4)(*val)
     105
     106
     107####
     108#### awgapi.h
     109####
     110
     111libawg.awgSetChannel.argtypes = [c_char_p,]
     112libawg.awgSetChannel.restype = c_int
     113awgSetChannel = libawg.awgSetChannel
     114
     115libawg.awgAddWaveform.argtypes = [c_int, POINTER(AWG_Component), c_int]
     116libawg.awgAddWaveform.restype = c_int
     117def awgAddWaveform(slot, comp):
     118    comp_array = (AWG_Component * len(comp))(*comp)
     119    return libawg.awgAddWaveform(slot, comp_array, len(comp))
     120
     121libawg.awgStopWaveform.argtypes = [c_int, c_int, c_ulonglong]
     122libawg.awgStopWaveform.restype = c_int
     123awgStopWaveform = libawg.awgStopWaveform
     124
     125libawg.awgClearWaveforms.argtypes = [c_int,]
     126libawg.awgClearWaveforms.restype = c_int
     127awgClearWaveforms = libawg.awgClearWaveforms
     128
     129libawg.awgRemoveChannel.argtypes = [c_int,]
     130libawg.awgRemoveChannel.restype = c_int
     131awgRemoveChannel = libawg.awgRemoveChannel
     132
     133libawg.awgSetGain.argtypes = [c_int, c_double, c_ulonglong]
     134libawg.awgSetGain.restype = c_int
     135awgSetGain = libawg.awgSetGain
     136
     137libawg.awgSetFilter.argtypes = [c_int, POINTER(c_double), c_int]
     138libawg.awgSetFilter.restype = c_int
     139def awgSetFilter(slot, coeffs):
     140    coeffs_array = (c_double * len(coeffs))(*coeffs)
     141    return libawg.awgSetFilter(slot, coeffs_array, len(coeffs))
     142
     143libawg.awg_cleanup.argtypes = []
     144libawg.awg_cleanup.restype = None
     145awg_cleanup = libawg.awg_cleanup
     146
     147
     148####
     149#### testpoint.h
     150####
     151
     152libtestpoint.tpRequestName.argtypes = [c_char_p, c_ulonglong,
     153                                       POINTER(c_ulonglong), POINTER(c_int)]
     154libtestpoint.tpRequestName.restype = c_int
     155def tpRequestName(tpNames, timeout, time, epoch):
     156    if time is not None:
     157        time_ptr = POINTER(c_ulonglong)(time)
     158    else:
     159        time_ptr = POINTER(c_ulonglong)()
     160    if epoch is not None:
     161        epoch_ptr = POINTER(c_int)(epoch)
     162    else:
     163        epoch_ptr = POINTER(c_int)()
     164    return libtestpoint.tpRequestName(tpNames, timeout, time_ptr, epoch_ptr)
     165
     166libtestpoint.tpClearName.argtypes = [c_char_p,]
     167libtestpoint.tpClearName.restype = c_int
     168tpClearName = libtestpoint.tpClearName
     169
     170libtestpoint.testpoint_cleanup.argtypes = []
     171libtestpoint.testpoint_cleanup.restype = None
     172testpoint_cleanup = libtestpoint.testpoint_cleanup
     173
     174
     175if libSIStr:
     176  ####
     177  #### SIStr.h
     178  ####
     179
     180  # Compile-time parameters
     181  SIStr_MAXCHANNAMELENGTH = 64
     182  # Target "lead time" for sending waveform data, in NANOseconds
     183  SIStr_LEADTIME = 7 * 10**9
     184  # Status codes
     185  SIStr_OK = 0
     186
     187  class SIStrBuf(Structure):
     188    pass
     189
     190  SIStrBuf._fields_ = [('gpstime', c_int),
     191                       ('epoch', c_int),
     192                       ('iblock', c_int),
     193                       ('size', c_int),
     194                       ('ndata', c_int),
     195                       ('next', POINTER(SIStrBuf)),
     196                       ('data', POINTER(c_float))]
     197
     198  class SIStream(Structure):
     199    _fields_ = [('magic', c_int),
     200                ('id', c_int),
     201                ('channel', c_char * SIStr_MAXCHANNAMELENGTH),
     202                ('samprate', c_int),
     203                ('starttime', c_double),
     204                ('slot', c_int),
     205                ('tp', c_int),
     206                ('comp', c_int),
     207                ('blocksize', c_int),
     208                ('nblocks', c_int),
     209                ('curgps', c_int),
     210                ('curepoch', c_int),
     211                ('sentgps', c_int),
     212                ('sentepoch', c_int),
     213                ('nbufs', c_int),
     214                ('curbuf', POINTER(SIStrBuf)),
     215                ('firstbuf', POINTER(SIStrBuf)),
     216                ('lastbuf', POINTER(SIStrBuf)),
     217                ('lastsend', c_longlong),
     218                ('minwait', c_longlong),
     219                ('aborted', c_int)]
     220
     221  libSIStr.SIStrAppInfo.argtypes = [c_char_p,]
     222  libSIStr.SIStrAppInfo.restype = None
     223  SIStrAppInfo = libSIStr.SIStrAppInfo
     224 
     225  libSIStr.SIStrOpen.argtypes = [POINTER(SIStream), c_char_p, c_int, c_double]
     226  libSIStr.SIStrOpen.restype = c_int
     227  SIStrOpen = libSIStr.SIStrOpen
     228
     229  libSIStr.SIStrErrorMsg.argtypes = [c_int,]
     230  libSIStr.SIStrErrorMsg.restype = c_char_p
     231  SIStrErrorMsg = libSIStr.SIStrErrorMsg
     232
     233  libSIStr.SIStrAppend.argtypes = [POINTER(SIStream), ndpointer(c_float),
     234                                 c_int, c_float]
     235  libSIStr.SIStrAppend.restype = c_int
     236  def SIStrAppend(sis, data, scale):
     237      data_array = array(data).astype('f')
     238      return libSIStr.SIStrAppend(sis, data_array, data_array.size, scale)
     239
     240  libSIStr.SIStrClose.argtypes = [POINTER(SIStream),]
     241  libSIStr.SIStrClose.restype = c_int
     242  SIStrClose = libSIStr.SIStrClose
     243
     244  libSIStr.SIStrAbort.argtypes = [POINTER(SIStream),]
     245  libSIStr.SIStrAbort.restype = c_int
     246  SIStrAbort = libSIStr.SIStrAbort