Usar el struct
módulo de la biblioteca estándar de Python sería bastante fácil y extremadamente rápido ya que está escrito en C.
Así es como se puede utilizar para hacer lo que quiera. También permite omitir columnas de caracteres especificando valores negativos para el número de caracteres en el campo.
import struct
fieldwidths = (2, -10, 24)
fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's')
for fw in fieldwidths)
fieldstruct = struct.Struct(fmtstring)
parse = fieldstruct.unpack_from
print('fmtstring: {!r}, recsize: {} chars'.format(fmtstring, fieldstruct.size))
line = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\n'
fields = parse(line)
print('fields: {}'.format(fields))
Salida:
fmtstring: '2s 10x 24s', recsize: 36 chars
fields: ('AB', 'MNOPQRSTUVWXYZ0123456789')
Las siguientes modificaciones adaptarían su funcionamiento en Python 2 o 3 (y manejarían la entrada Unicode):
import struct
import sys
fieldstruct = struct.Struct(fmtstring)
if sys.version_info[0] < 3:
parse = fieldstruct.unpack_from
else:
unpack = fieldstruct.unpack_from
parse = lambda line: tuple(s.decode() for s in unpack(line.encode()))
Aquí hay una forma de hacerlo con cortes de cuerda, como estaba considerando, pero le preocupaba que pudiera ponerse demasiado feo. Lo bueno de esto es que, además de no ser tan feo, es que funciona sin cambios tanto en Python 2 como en 3, además de poder manejar cadenas Unicode. En cuanto a la velocidad, es, por supuesto, más lento que las versiones basadas en el struct
módulo, pero podría acelerarse un poco al eliminar la capacidad de tener campos de relleno.
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest as izip_longest
try:
from itertools import accumulate
except ImportError:
def accumulate(iterable):
'Return running totals (simplified version).'
total = next(iterable)
yield total
for value in iterable:
total += value
yield total
def make_parser(fieldwidths):
cuts = tuple(cut for cut in accumulate(abs(fw) for fw in fieldwidths))
pads = tuple(fw < 0 for fw in fieldwidths)
flds = tuple(izip_longest(pads, (0,)+cuts, cuts))[:-1]
parse = lambda line: tuple(line[i:j] for pad, i, j in flds if not pad)
parse.size = sum(abs(fw) for fw in fieldwidths)
parse.fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's')
for fw in fieldwidths)
return parse
line = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\n'
fieldwidths = (2, -10, 24)
parse = make_parser(fieldwidths)
fields = parse(line)
print('format: {!r}, rec size: {} chars'.format(parse.fmtstring, parse.size))
print('fields: {}'.format(fields))
Salida:
format: '2s 10x 24s', rec size: 36 chars
fields: ('AB', 'MNOPQRSTUVWXYZ0123456789')
struct.unpack
parece funcionar con datos binarios. No puedo hacer que esto funcione.No estoy realmente seguro de si esto es eficiente, pero debería ser legible (en lugar de hacer el corte manualmente). Definí una función
slices
que obtiene una cadena y longitudes de columna, y devuelve las subcadenas. Lo convertí en un generador, por lo que para líneas realmente largas, no crea una lista temporal de subcadenas.def slices(s, *args): position = 0 for length in args: yield s[position:position + length] position += length
Ejemplo
In [32]: list(slices('abcdefghijklmnopqrstuvwxyz0123456789', 2)) Out[32]: ['ab'] In [33]: list(slices('abcdefghijklmnopqrstuvwxyz0123456789', 2, 10, 50)) Out[33]: ['ab', 'cdefghijkl', 'mnopqrstuvwxyz0123456789'] In [51]: d,c,h = slices('dogcathouse', 3, 3, 5) In [52]: d,c,h Out[52]: ('dog', 'cat', 'house')
Pero creo que la ventaja de un generador se pierde si necesita todas las columnas a la vez. De lo que uno podría beneficiarse es cuando desee procesar las columnas una por una, digamos en un bucle.
fuente
struct
, pero es legible y más fácil de manejar. He hecho algunas pruebas con elslices function
,struct
módulo y tambiénre
módulo y resulta que para archivos de gran tamaño,struct
es el más rápido,re
el segundo puesto (1,5 veces más lento) yslices
tercer (2x más lento). Sin embargo, hay una pequeña sobrecarga de usostruct
para queslices function
pueda ser más rápido en archivos más pequeños.Dos opciones más que son más fáciles y bonitas que las soluciones ya mencionadas:
El primero es usar pandas:
import pandas as pd path = 'filename.txt' # Using Pandas with a column specification col_specification = [(0, 20), (21, 30), (31, 50), (51, 100)] data = pd.read_fwf(path, colspecs=col_specification)
Y la segunda opción usando numpy.loadtxt:
import numpy as np # Using NumPy and letting it figure it out automagically data_also = np.loadtxt(path)
Realmente depende de la forma en la que desee utilizar sus datos.
fuente
colspecs='infer'
pandas.pydata.org/pandas-docs/stable/generated/…El siguiente código ofrece un bosquejo de lo que podría querer hacer si tiene que manejar un archivo de ancho de columna fijo.
"Serio" = múltiples tipos de registros en cada uno de los múltiples tipos de archivos, registros de hasta 1000 bytes, el que define el diseño y el productor / consumidor "opuesto" es un departamento gubernamental con actitud, los cambios de diseño dan como resultado columnas no utilizadas, hasta un millón de registros en un archivo, ...
Características: Precompila los formatos de estructura. Ignora columnas no deseadas. Convierte cadenas de entrada en tipos de datos requeridos (el boceto omite el manejo de errores). Convierte registros en instancias de objetos (o dictados o tuplas con nombre si lo prefiere).
Código:
import struct, datetime, io, pprint # functions for converting input fields to usable data cnv_text = rstrip cnv_int = int cnv_date_dmy = lambda s: datetime.datetime.strptime(s, "%d%m%Y") # ddmmyyyy # etc # field specs (field name, start pos (1-relative), len, converter func) fieldspecs = [ ('surname', 11, 20, cnv_text), ('given_names', 31, 20, cnv_text), ('birth_date', 51, 8, cnv_date_dmy), ('start_date', 71, 8, cnv_date_dmy), ] fieldspecs.sort(key=lambda x: x[1]) # just in case # build the format for struct.unpack unpack_len = 0 unpack_fmt = "" for fieldspec in fieldspecs: start = fieldspec[1] - 1 end = start + fieldspec[2] if start > unpack_len: unpack_fmt += str(start - unpack_len) + "x" unpack_fmt += str(end - start) + "s" unpack_len = end field_indices = range(len(fieldspecs)) print unpack_len, unpack_fmt unpacker = struct.Struct(unpack_fmt).unpack_from class Record(object): pass # or use named tuples raw_data = """\ ....v....1....v....2....v....3....v....4....v....5....v....6....v....7....v....8 Featherstonehaugh Algernon Marmaduke 31121969 01012005XX """ f = cStringIO.StringIO(raw_data) headings = f.next() for line in f: # The guts of this loop would of course be hidden away in a function/method # and could be made less ugly raw_fields = unpacker(line) r = Record() for x in field_indices: setattr(r, fieldspecs[x][0], fieldspecs[x][3](raw_fields[x])) pprint.pprint(r.__dict__) print "Customer name:", r.given_names, r.surname
Salida:
78 10x20s20s8s12x8s {'birth_date': datetime.datetime(1969, 12, 31, 0, 0), 'given_names': 'Algernon Marmaduke', 'start_date': datetime.datetime(2005, 1, 1, 0, 0), 'surname': 'Featherstonehaugh'} Customer name: Algernon Marmaduke Featherstonehaugh
fuente
struct.error: unpack_from requires a buffer of at least 1157 bytes
> str = '1234567890' > w = [0,2,5,7,10] > [ str[ w[i-1] : w[i] ] for i in range(1,len(w)) ] ['12', '345', '67', '890']
fuente
Aquí hay un módulo simple para Python 3, basado en la respuesta de John Machin : adáptelo según sea necesario :)
""" fixedwidth Parse and iterate through a fixedwidth text file, returning record objects. Adapted from https://stackoverflow.com/a/4916375/243392 USAGE import fixedwidth, pprint # define the fixed width fields we want # fieldspecs is a list of [name, description, start, width, type] arrays. fieldspecs = [ ["FILEID", "File Identification", 1, 6, "A/N"], ["STUSAB", "State/U.S. Abbreviation (USPS)", 7, 2, "A"], ["SUMLEV", "Summary Level", 9, 3, "A/N"], ["LOGRECNO", "Logical Record Number", 19, 7, "N"], ["POP100", "Population Count (100%)", 30, 9, "N"], ] # define the fieldtype conversion functions fieldtype_fns = { 'A': str.rstrip, 'A/N': str.rstrip, 'N': int, } # iterate over record objects in the file with open(f, 'rb'): for record in fixedwidth.reader(f, fieldspecs, fieldtype_fns): pprint.pprint(record.__dict__) # output: {'FILEID': 'SF1ST', 'LOGRECNO': 2, 'POP100': 1, 'STUSAB': 'TX', 'SUMLEV': '040'} {'FILEID': 'SF1ST', 'LOGRECNO': 3, 'POP100': 2, 'STUSAB': 'TX', 'SUMLEV': '040'} ... """ import struct, io # fieldspec columns iName, iDescription, iStart, iWidth, iType = range(5) def get_struct_unpacker(fieldspecs): """ Build the format string for struct.unpack to use, based on the fieldspecs. fieldspecs is a list of [name, description, start, width, type] arrays. Returns a string like "6s2s3s7x7s4x9s". """ unpack_len = 0 unpack_fmt = "" for fieldspec in fieldspecs: start = fieldspec[iStart] - 1 end = start + fieldspec[iWidth] if start > unpack_len: unpack_fmt += str(start - unpack_len) + "x" unpack_fmt += str(end - start) + "s" unpack_len = end struct_unpacker = struct.Struct(unpack_fmt).unpack_from return struct_unpacker class Record(object): pass # or use named tuples def reader(f, fieldspecs, fieldtype_fns): """ Wrap a fixedwidth file and return records according to the given fieldspecs. fieldspecs is a list of [name, description, start, width, type] arrays. fieldtype_fns is a dictionary of functions used to transform the raw string values, one for each type. """ # make sure fieldspecs are sorted properly fieldspecs.sort(key=lambda fieldspec: fieldspec[iStart]) struct_unpacker = get_struct_unpacker(fieldspecs) field_indices = range(len(fieldspecs)) for line in f: raw_fields = struct_unpacker(line) # split line into field values record = Record() for i in field_indices: fieldspec = fieldspecs[i] fieldname = fieldspec[iName] s = raw_fields[i].decode() # convert raw bytes to a string fn = fieldtype_fns[fieldspec[iType]] # get conversion function value = fn(s) # convert string to value (eg to an int) setattr(record, fieldname, value) yield record if __name__=='__main__': # test module import pprint, io # define the fields we want # fieldspecs are [name, description, start, width, type] fieldspecs = [ ["FILEID", "File Identification", 1, 6, "A/N"], ["STUSAB", "State/U.S. Abbreviation (USPS)", 7, 2, "A"], ["SUMLEV", "Summary Level", 9, 3, "A/N"], ["LOGRECNO", "Logical Record Number", 19, 7, "N"], ["POP100", "Population Count (100%)", 30, 9, "N"], ] # define a conversion function for integers def to_int(s): """ Convert a numeric string to an integer. Allows a leading ! as an indicator of missing or uncertain data. Returns None if no data. """ try: return int(s) except: try: return int(s[1:]) # ignore a leading ! except: return None # assume has a leading ! and no value # define the conversion fns fieldtype_fns = { 'A': str.rstrip, 'A/N': str.rstrip, 'N': to_int, # 'N': int, # 'D': lambda s: datetime.datetime.strptime(s, "%d%m%Y"), # ddmmyyyy # etc } # define a fixedwidth sample sample = """\ SF1ST TX04089000 00000023748 1 SF1ST TX04090000 00000033748! 2 SF1ST TX04091000 00000043748! """ sample_data = sample.encode() # convert string to bytes file_like = io.BytesIO(sample_data) # create a file-like wrapper around bytes # iterate over record objects in the file for record in reader(file_like, fieldspecs, fieldtype_fns): # print(record) pprint.pprint(record.__dict__)
fuente
Así es como resolví con un diccionario que contiene dónde comienzan y terminan los campos. Dar los puntos de inicio y final también me ayudó a gestionar los cambios a lo largo de la columna.
# fixed length # '---------- ------- ----------- -----------' line = '20.06.2019 myname active mydevice ' SLICES = {'date_start': 0, 'date_end': 10, 'name_start': 11, 'name_end': 18, 'status_start': 19, 'status_end': 30, 'device_start': 31, 'device_end': 42} def get_values_as_dict(line, SLICES): values = {} key_list = {key.split("_")[0] for key in SLICES.keys()} for key in key_list: values[key] = line[SLICES[key+"_start"]:SLICES[key+"_end"]].strip() return values >>> print (get_values_as_dict(line,SLICES)) {'status': 'active', 'name': 'myname', 'date': '20.06.2019', 'device': 'mydevice'}
fuente
Esto es lo que NumPy usa bajo el capó (mucho más simplificado, pero aún así, este código se encuentra
LineSplitter class
dentro de_iotools module
):import numpy as np DELIMITER = (20, 10, 10, 20, 10, 10, 20) idx = np.cumsum([0] + list(DELIMITER)) slices = [slice(i, j) for (i, j) in zip(idx[:-1], idx[1:])] def parse(line): return [line[s] for s in slices]
No maneja delimitadores negativos para ignorar la columna por lo que no es tan versátil como
struct
, pero es más rápido.fuente
El corte de cuerdas no tiene por qué ser feo siempre que lo mantenga organizado. Considere almacenar los anchos de sus campos en un diccionario y luego usar los nombres asociados para crear un objeto:
from collections import OrderedDict class Entry: def __init__(self, line): name2width = OrderedDict() name2width['foo'] = 2 name2width['bar'] = 3 name2width['baz'] = 2 pos = 0 for name, width in name2width.items(): val = line[pos : pos + width] if len(val) != width: raise ValueError("not enough characters: \'{}\'".format(line)) setattr(self, name, val) pos += width file = "ab789yz\ncd987wx\nef555uv" entry = [] for line in file.split('\n'): entry.append(Entry(line)) print(entry[1].bar) # output: 987
fuente
Debido a que mi trabajo anterior a menudo maneja 1 millón de líneas de datos de ancho fijo, investigué sobre este problema cuando comencé a usar Python.
Hay 2 tipos de FixedWidth
Si la cadena de recursos está compuesta por caracteres ascii, entonces ASCII FixedWidth = Unicode FixedWidth
Afortunadamente, la cadena y el byte son diferentes en py3, lo que reduce mucha confusión cuando se trata de caracteres codificados de doble byte (eggbk, big5, euc-jp, shift-jis, etc.).
Para el procesamiento de "ASCII FixedWidth", la cadena generalmente se convierte en bytes y luego se divide.
Sin importar módulos de terceros,
totalLineCount = 1 millón, lineLength = 800 bytes, FixedWidthArgs = (10,25,4, ....), divido la línea en aproximadamente 5 formas y obtengo la siguiente conclusión:
slice(bytes)
es más rápido queslice(string)
Cuando se trata de archivos grandes, a menudo usamos
with open ( file, "rb") as f:
.El método atraviesa uno de los archivos anteriores, aproximadamente 2,4 segundos.
Creo que el controlador adecuado, que procesa 1 millón de filas de datos, divide cada fila en 20 campos y tarda menos de 2,4 segundos.
Solo encuentro eso
stuct
yitemgetter
cumplo con los requisitosps: Para una visualización normal, convertí str unicode a bytes. Si se encuentra en un entorno de doble byte, no es necesario que haga esto.
from itertools import accumulate from operator import itemgetter def oprt_parser(sArgs): sum_arg = tuple(accumulate(abs(i) for i in sArgs)) # Negative parameter field index cuts = tuple(i for i,num in enumerate(sArgs) if num < 0) # Get slice args and Ignore fields of negative length ig_Args = tuple(item for i, item in enumerate(zip((0,)+sum_arg,sum_arg)) if i not in cuts) # Generate `operator.itemgetter` object oprtObj =itemgetter(*[slice(s,e) for s,e in ig_Args]) return oprtObj lineb = b'abcdefghijklmnopqrstuvwxyz\xb0\xa1\xb2\xbb\xb4\xd3\xb5\xc4\xb6\xee\xb7\xa2\xb8\xf6\xba\xcd0123456789' line = lineb.decode("GBK") # Unicode Fixed Width fieldwidthsU = (13, -13, 4, -4, 5,-5) # Negative width fields is ignored # ASCII Fixed Width fieldwidths = (13, -13, 8, -8, 5,-5) # Negative width fields is ignored # Unicode FixedWidth processing parse = oprt_parser(fieldwidthsU) fields = parse(line) print('Unicode FixedWidth','fields: {}'.format(tuple(map(lambda s: s.encode("GBK"), fields)))) # ASCII FixedWidth processing parse = oprt_parser(fieldwidths) fields = parse(lineb) print('ASCII FixedWidth','fields: {}'.format(fields)) line = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\n' fieldwidths = (2, -10, 24) parse = oprt_parser(fieldwidths) fields = parse(line) print(f"fields: {fields}")
Salida:
Unicode FixedWidth fields: (b'abcdefghijklm', b'\xb0\xa1\xb2\xbb\xb4\xd3\xb5\xc4', b'01234') ASCII FixedWidth fields: (b'abcdefghijklm', b'\xb0\xa1\xb2\xbb\xb4\xd3\xb5\xc4', b'01234') fields: ('AB', 'MNOPQRSTUVWXYZ0123456789')
oprt_parser
es 4xmake_parser
(lista por comprensión + sector)Durante la investigación, se encontró que cuando la velocidad de la CPU es más rápida, parece que la eficiencia del
re
método aumenta más rápido.Como no tengo más y mejores computadoras para probar, proporcione mi código de prueba, si alguien está interesado, puede probarlo con una computadora más rápida.
Entorno de ejecución:
import timeit import time import re from itertools import accumulate from operator import itemgetter def eff2(stmt,onlyNum= False,showResult=False): '''test function''' if onlyNum: rl = timeit.repeat(stmt=stmt,repeat=roundI,number=timesI,globals=globals()) avg = sum(rl) / len(rl) return f"{avg * (10 ** 6)/timesI:0.4f}" else: rl = timeit.repeat(stmt=stmt,repeat=10,number=1000,globals=globals()) avg = sum(rl) / len(rl) print(f"【{stmt}】") print(f"\tquick avg = {avg * (10 ** 6)/1000:0.4f} s/million") if showResult: print(f"\t Result = {eval(stmt)}\n\t timelist = {rl}\n") else: print("") def upDouble(argList,argRate): return [c*argRate for c in argList] tbStr = "000000001111000002222真2233333333000000004444444QAZ55555555000000006666666ABC这些事中文字abcdefghijk" tbBytes = tbStr.encode("GBK") a20 = (4,4,2,2,2,3,2,2, 2 ,2,8,8,7,3,8,8,7,3, 12 ,11) a20U = (4,4,2,2,2,3,2,2, 1 ,2,8,8,7,3,8,8,7,3, 6 ,11) Slng = 800 rateS = Slng // 100 tStr = "".join(upDouble(tbStr , rateS)) tBytes = tStr.encode("GBK") spltArgs = upDouble( a20 , rateS) spltArgsU = upDouble( a20U , rateS) testList = [] timesI = 100000 roundI = 5 print(f"test round = {roundI} timesI = {timesI} sourceLng = {len(tStr)} argFieldCount = {len(spltArgs)}") print(f"pure str \n{''.ljust(60,'-')}") # ========================================== def str_parser(sArgs): def prsr(oStr): r = [] r_ap = r.append stt=0 for lng in sArgs: end = stt + lng r_ap(oStr[stt:end]) stt = end return tuple(r) return prsr Str_P = str_parser(spltArgsU) # eff2("Str_P(tStr)") testList.append("Str_P(tStr)") print(f"pure bytes \n{''.ljust(60,'-')}") # ========================================== def byte_parser(sArgs): def prsr(oBytes): r, stt = [], 0 r_ap = r.append for lng in sArgs: end = stt + lng r_ap(oBytes[stt:end]) stt = end return r return prsr Byte_P = byte_parser(spltArgs) # eff2("Byte_P(tBytes)") testList.append("Byte_P(tBytes)") # re,bytes print(f"re compile object \n{''.ljust(60,'-')}") # ========================================== def rebc_parser(sArgs,otype="b"): re_Args = "".join([f"(.{{{n}}})" for n in sArgs]) if otype == "b": rebc_Args = re.compile(re_Args.encode("GBK")) else: rebc_Args = re.compile(re_Args) def prsr(oBS): return rebc_Args.match(oBS).groups() return prsr Rebc_P = rebc_parser(spltArgs) # eff2("Rebc_P(tBytes)") testList.append("Rebc_P(tBytes)") Rebc_Ps = rebc_parser(spltArgsU,"s") # eff2("Rebc_Ps(tStr)") testList.append("Rebc_Ps(tStr)") print(f"struct \n{''.ljust(60,'-')}") # ========================================== import struct def struct_parser(sArgs): struct_Args = " ".join(map(lambda x: str(x) + "s", sArgs)) def prsr(oBytes): return struct.unpack(struct_Args, oBytes) return prsr Struct_P = struct_parser(spltArgs) # eff2("Struct_P(tBytes)") testList.append("Struct_P(tBytes)") print(f"List Comprehensions + slice \n{''.ljust(60,'-')}") # ========================================== import itertools def slice_parser(sArgs): tl = tuple(itertools.accumulate(sArgs)) slice_Args = tuple(zip((0,)+tl,tl)) def prsr(oBytes): return [oBytes[s:e] for s, e in slice_Args] return prsr Slice_P = slice_parser(spltArgs) # eff2("Slice_P(tBytes)") testList.append("Slice_P(tBytes)") def sliceObj_parser(sArgs): tl = tuple(itertools.accumulate(sArgs)) tl2 = tuple(zip((0,)+tl,tl)) sliceObj_Args = tuple(slice(s,e) for s,e in tl2) def prsr(oBytes): return [oBytes[so] for so in sliceObj_Args] return prsr SliceObj_P = sliceObj_parser(spltArgs) # eff2("SliceObj_P(tBytes)") testList.append("SliceObj_P(tBytes)") SliceObj_Ps = sliceObj_parser(spltArgsU) # eff2("SliceObj_Ps(tStr)") testList.append("SliceObj_Ps(tStr)") print(f"operator.itemgetter + slice object \n{''.ljust(60,'-')}") # ========================================== def oprt_parser(sArgs): sum_arg = tuple(accumulate(abs(i) for i in sArgs)) cuts = tuple(i for i,num in enumerate(sArgs) if num < 0) ig_Args = tuple(item for i,item in enumerate(zip((0,)+sum_arg,sum_arg)) if i not in cuts) oprtObj =itemgetter(*[slice(s,e) for s,e in ig_Args]) return oprtObj Oprt_P = oprt_parser(spltArgs) # eff2("Oprt_P(tBytes)") testList.append("Oprt_P(tBytes)") Oprt_Ps = oprt_parser(spltArgsU) # eff2("Oprt_Ps(tStr)") testList.append("Oprt_Ps(tStr)") print("|".join([s.split("(")[0].center(11," ") for s in testList])) print("|".join(["".center(11,"-") for s in testList])) print("|".join([eff2(s,True).rjust(11," ") for s in testList]))
Salida:
Test round = 5 timesI = 100000 sourceLng = 744 argFieldCount = 20 ... ... Str_P | Byte_P | Rebc_P | Rebc_Ps | Struct_P | Slice_P | SliceObj_P|SliceObj_Ps| Oprt_P | Oprt_Ps -----------|-----------|-----------|-----------|-- ---------|-----------|-----------|-----------|---- -------|----------- 9.6315| 7.5952| 4.4187| 5.6867| 1.5123| 5.2915| 4.2673| 5.7121| 2.4713| 3.9051
fuente