Source code for PseudoNetCDF.camxfiles.uamiv.Memmap

__all__ = ['uamiv']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- uamiv Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              uamiv files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
# from warnings import warn

# Site-Packages
from numpy import array, memmap, dtype, linspace

# This Package modules
from PseudoNetCDF.sci_var import PseudoIOAPIVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime
from PseudoNetCDF.camxfiles.units import get_uamiv_units, get_chemparam_names
from PseudoNetCDF.cmaqfiles import ioapi_base
# from PseudoNetCDF.sci_var import PseudoNetCDFFile
# from PseudoNetCDF.conventions.ioapi import add_cf_from_ioapi
# for use in identifying uncaught nan


[docs]class uamiv(ioapi_base): """ uamiv provides a PseudoNetCDF interface for CAMx uamiv files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> uamiv_path = 'camx_uamiv.bin' >>> uamivfile = uamiv(uamiv_path) >>> uamivfile.variables.keys() ['TFLAG', 'O3', 'NO', 'NO2', ...] >>> tflag = uamivfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v = uamivfile.variables['O3'] >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> uamivfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ __ione = 1 __idum = 0 __rdum = 0. def _make_header_fmt(self, ep=None): if ep is None: ep = self.__endianprefix self.__emiss_hdr_fmt = dtype(dict( names=['SPAD', 'name', 'note', 'itzon', 'nspec', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['i', '(10, 4)S1', '(60,4)S1', 'i', 'i', 'i', 'f', 'i', 'f', 'i'])).newbyteorder(ep) self.__grid_hdr_fmt = dtype(dict( names=['SPAD', 'plon', 'plat', 'iutm', 'xorg', 'yorg', 'delx', 'dely', 'nx', 'ny', 'nz', 'iproj', 'istag', 'tlat1', 'tlat2', 'rdum', 'EPAD'], formats=['i', 'f', 'f', 'i', 'f', 'f', 'f', 'f', 'i', 'i', 'i', 'i', 'i', 'f', 'f', 'f', 'i'])).newbyteorder(ep) self.__cell_hdr_fmt = dtype(dict( names=['SPAD', 'ione1', 'ione2', 'nx', 'ny', 'EPAD'], formats=['i', 'i', 'i', 'i', 'i', 'i'])).newbyteorder(ep) self.__time_hdr_fmt = dtype(dict( names=['SPAD', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['i', 'i', 'f', 'i', 'f', 'i'])).newbyteorder(ep) self.__spc_fmt = dtype("(10,4)S1").newbyteorder(ep) def __init__(self, rf, mode='r', P_ALP=None, P_BET=None, P_GAM=None, XCENT=None, YCENT=None, GDTYP=None, endian='big', chemparam=None): """ Parameters ---------- rf : string or RecordFile usually a path to a CAMx formatted file, can be a FortranFileUtil RecordFile object. mode : string file open mode read ('r'), write ('w'), append ('a', 'r+') P_ALP : float see IOAPI GRIDDESC documentation P_BET : float see IOAPI GRIDDESC documentation P_GAM : float see IOAPI GRIDDESC documentation XCENT : float see IOAPI GRIDDESC documentation YCENT : float see IOAPI GRIDDESC documentation GDTYP : float see IOAPI GRIDDESC documentation endian : string 'big' or 'little' usually only if mistaken compile chemparam : None or string used to identify gases and aerosols Returns ------- outf : uamiv PseudoNetCDFFile populated from file """ if chemparam is None: self._aerosol_names = None else: self._aerosol_names = get_chemparam_names(chemparam)['aerosol'] self.__endianprefix = dict(big='>', little='<')[endian] self._make_header_fmt() self.__rffile = rf self.__mode = mode self.createDimension('DATE-TIME', 2) self.__readheader() # Add IOAPI metavariables self.NLAYS = len(self.dimensions['LAY']) self.NROWS = len(self.dimensions['ROW']) self.NCOLS = len(self.dimensions['COL']) self.NVARS = len(self.dimensions['VAR']) self.NSTEPS = len(self.dimensions['TSTEP']) varlist = "".join([i.ljust(16) for i in self.__var_names__]) setattr(self, 'VAR-LIST', varlist) self.NAME = self.__emiss_hdr['name'][0, :, 0].copy().view('S10')[ 0].decode() self.NOTE = self.__emiss_hdr['note'][0, :, 0].copy().view('S60')[ 0].decode() self.ITZON = self.__emiss_hdr['itzon'][0] self.FTYPE = 1 self.VGTYP = 2 self.VGTOP = 10000. self.VGLVLS = linspace(0, 1, self.NLAYS + 1)[::-1] self.GDNAM = "CAMx " self.UPNAM = "CAMx " self.FILEDESC = "CAMx " # Create variables self.variables = PseudoNetCDFVariables( self.__variables, ['TFLAG', 'ETFLAG'] + self.__var_names__) tflag = ConvertCAMxTime(self.__memmap__['DATE']['BDATE'], self.__memmap__['DATE']['BTIME'], self.NVARS) etflag = ConvertCAMxTime(self.__memmap__['DATE']['EDATE'], self.__memmap__['DATE']['ETIME'], self.NVARS) tflagv = self.createVariable('TFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME'), values=tflag, units='DATE-TIME', long_name='TFLAG'.ljust(16), var_desc='TFLAG'.ljust(80)) etflagv = self.createVariable('ETFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME'), values=etflag, units='DATE-TIME', long_name='ETFLAG'.ljust(16), var_desc='Ending TFLAG'.ljust(80)) self.SDATE, self.STIME = self.variables['TFLAG'][0, 0, :] self.TSTEP = etflagv[0, 0, 1] - tflagv[0, 0, 1] if P_ALP is not None: self.P_ALP = P_ALP if P_BET is not None: self.P_BET = P_BET if P_GAM is not None: self.P_GAM = P_GAM if XCENT is not None: self.XCENT = XCENT if YCENT is not None: self.YCENT = YCENT if GDTYP is not None: self.GDTYP = GDTYP # try: # add_cf_from_ioapi(self) # except Exception as e: # warn(repr(e)) # pass def __checkfilelen(self): f = open(self.__rffile, 'rb') f.seek(0, 2) flen = f.tell() f.close() return flen
[docs] @classmethod def isMine(cls, path): self = uamiv.__new__(uamiv) cls._make_header_fmt(self, '>') offset = 0 emiss_hdr = memmap( path, mode='r', dtype=self.__emiss_hdr_fmt, shape=1, offset=offset) name = emiss_hdr['name'][0, :, 0].copy().view('S10')[ 0].decode().strip() if name in ('BOUNDARY', 'PTSOURCE'): return False if ( not emiss_hdr['SPAD'] == emiss_hdr['EPAD'] and emiss_hdr['SPAD'] == emiss_hdr.dtype.itemsize - 8 ): return False offset += emiss_hdr.dtype.itemsize * emiss_hdr.size grid_hdr = memmap( path, mode='r', dtype=self.__grid_hdr_fmt, shape=1, offset=offset) if ( not grid_hdr['SPAD'] == grid_hdr['EPAD'] and grid_hdr['SPAD'] == grid_hdr.dtype.itemsize - 8 ): return False offset += grid_hdr.dtype.itemsize * grid_hdr.size cell_hdr = memmap( path, mode='r', dtype=self.__cell_hdr_fmt, shape=1, offset=offset) if ( not cell_hdr['SPAD'] == cell_hdr['EPAD'] and cell_hdr['SPAD'] == cell_hdr.dtype.itemsize - 8 ): return False return name in ('AIRQUALITY', 'EMISSIONS', 'INSTANT', 'AVERAGE')
def __readheader(self): ep = self.__endianprefix offset = 0 self.__emiss_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__emiss_hdr_fmt, shape=1, offset=offset) nspec = self.__emiss_hdr['nspec'][0] offset += self.__emiss_hdr.dtype.itemsize * self.__emiss_hdr.size self.__grid_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__grid_hdr_fmt, shape=1, offset=offset) self.XORIG = self.__grid_hdr['xorg'][0] self.YORIG = self.__grid_hdr['yorg'][0] self.XCELL = self.__grid_hdr['delx'][0] self.YCELL = self.__grid_hdr['dely'][0] self.PLON = plon = self.__grid_hdr['plon'][0] self.PLAT = plat = self.__grid_hdr['plat'][0] self.TLAT1 = tlat1 = self.__grid_hdr['tlat1'][0] self.TLAT2 = tlat2 = self.__grid_hdr['tlat2'][0] self.IUTM = iutm = self.__grid_hdr['iutm'][0] self.ISTAG = self.__grid_hdr['istag'][0] self.CPROJ = cproj = self.__grid_hdr['iproj'][0] if hasattr(self, 'GDTYP'): GDTYPE = self.GDTYP else: GDTYPE = self.GDTYP = {0: 1, 1: 5, 2: 2, 3: 6}[cproj] if ( cproj == 0 or not all([x == 0 for x in [plon, plat, tlat1, tlat2, iutm, cproj]]) ): # Map CAMx projection constants to IOAPI self.XCENT = plon self.YCENT = plat if GDTYPE in (1, 2): self.P_ALP = tlat1 self.P_BET = tlat2 self.P_GAM = plon elif GDTYPE == 5: self.P_ALP = iutm self.P_BET = 0. self.P_GAM = 0. elif GDTYPE == 6: self.P_ALP = {90: 1, -90: -1}[plat] self.P_BET = tlat1 self.P_GAM = plon else: raise ValueError('Unknown projection') nx = self.__grid_hdr['nx'][0] ny = self.__grid_hdr['ny'][0] nz = max(self.__grid_hdr['nz'], array([1]))[0] offset += self.__grid_hdr.dtype.itemsize * self.__grid_hdr.size self.__cell_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__cell_hdr_fmt, shape=1, offset=offset) offset += self.__cell_hdr.dtype.itemsize * self.__cell_hdr.size + 4 self.__spc_hdr = memmap(self.__rffile, mode=self.__mode, dtype=self.__spc_fmt, shape=nspec, offset=offset) offset += self.__spc_hdr.dtype.itemsize * self.__spc_hdr.size + 4 date_time_fmt = dtype(dict( names=['SPAD', 'BDATE', 'BTIME', 'EDATE', 'ETIME', 'EPAD'], formats=['i', 'i', 'f', 'i', 'f', 'i'])).newbyteorder(ep) date_time_block_size = 6 spc_1_lay_fmt = dtype(dict( names=['SPAD', 'IONE', 'SPC', 'DATA', 'EPAD'], formats=['i', 'i', '(10,4)S1', '(%d,%d)f' % (ny, nx), 'i'])).newbyteorder(ep) spc_1_lay_block_size = 13 + nx * ny spc_3d_fmt = dtype((spc_1_lay_fmt, (nz,))) # Get species names from spc_hdr var_names = [spc[:, 0].copy().view('S10')[0] for spc in self.__spc_hdr] var_names = [v.decode() if hasattr(v, 'decode') else v for v in var_names] self.__var_names__ = [''.join(v).strip() for v in var_names] data_block_fmt = dtype(dict( names=['DATE'] + self.__var_names__, formats=[date_time_fmt] + [spc_3d_fmt] * nspec)) data_block_size = (date_time_block_size + nspec * nz * spc_1_lay_block_size) f = open(self.__rffile) f.seek(0, 2) size = f.tell() f.close() del f ntimes = float(size - offset) / 4. / data_block_size if int(ntimes) != ntimes: raise ValueError( ("Partial time output (%f times); partial time indicates " + "incomplete file.") % ntimes) ntimes = int(ntimes) self.createDimension('LAY', nz) self.createDimension('COL', nx) self.createDimension('ROW', ny) tstep = self.createDimension('TSTEP', ntimes) tstep.setunlimited(True) self.createDimension('VAR', nspec) self.__memmap__ = memmap(self.__rffile, mode=self.__mode, dtype=data_block_fmt, offset=offset) def __variables(self, k): dimensions = ('TSTEP', 'LAY', 'ROW', 'COL') outvals = self.__memmap__[k]['DATA'] units = get_uamiv_units(self.NAME, k, self._aerosol_names) return PseudoIOAPIVariable(self, k, 'f', dimensions, values=outvals, units=units)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testAvg(self): import PseudoNetCDF.testcase self.assert_(uamiv.isMine( PseudoNetCDF.testcase.camxfiles_paths['uamiv'])) emissfile = uamiv(PseudoNetCDF.testcase.camxfiles_paths['uamiv']) emissfile.variables['TFLAG'] v = emissfile.variables['NO2'] checkv = array( [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.24175494e-04, 2.79196858e-04, 1.01672206e-03, 4.36782313e-04, 0.00000000e+00, 1.54810550e-04, 3.90250643e-04, 6.18023798e-04, 3.36963218e-04, 0.00000000e+00, 1.85579920e-04, 1.96825975e-04, 2.16468165e-04, 2.19882189e-04], dtype='f').reshape(1, 1, 4, 5) self.assert_((v == checkv).all()) def testClose(self): import PseudoNetCDF.testcase emissfile = uamiv(PseudoNetCDF.testcase.camxfiles_paths['uamiv']) emissfile.close() def testSync(self): import PseudoNetCDF.testcase emissfile = uamiv(PseudoNetCDF.testcase.camxfiles_paths['uamiv']) emissfile.sync() if __name__ == '__main__': unittest.main()