Skip to content

Sniffing

tabbed.sniffing

Tools for determining the dialect and structure of a csv file that may contain metadata, a header, and a data section.

tabbed.sniffing.Sniffer

Bases: ReprMixin

A tool for inferring the dialect and structure of a CSV file.

The formatting of CSV files can vary widely. Python's builtin Sniffer is capable of handling different dialects (separators, line terminators, quotes etc) but assumes the first line within the file is a header or a row of unheaded data. In practice, many CSV files contain metadata prior to the header or data section. While these files are not compliant with CSV standards (RFC-4180), their broad use necessitates file sniffing that infers both dialect and structure. To date, some csv readers such as Pandas read_csv allow metadata rows to be skipped but no formal mechanism for sniffing dialect, metadata and header information exist. This Sniffer supports these operations.

Attributes:

Name Type Description
infile

An open file, an IO instance.

line_count

The number of lines in infile.

start int

The start line of infile for collecting a sample of 'amount' number of lines.

amount int

The number of infile lines to sample for dialect, header and metadata detection. The initial value defaults to the smaller of line_count or 100 lines. The amount should be large enough to include some of the data section of the file.

skips List[int]

Line numbers to ignore during sample collection.

Examples:

>>> import tempfile
>>> delimiter = ';'
>>> # make a metadata and add to text that will be written to tempfile
>>> metadata = {'exp': '3', 'name': 'Paul Dirac', 'date': '11/09/1942'}
>>> text = [delimiter.join([key, val]) for key, val in metadata.items()]
>>> # make a header and row to skip and add to text
>>> header = delimiter.join('group count color'.split())
>>> to_skip = delimiter.join('please ignore this line'.split())
>>> text.extend([header, to_skip])
>>> # make some data rows and add to text
>>> group = 'a c b b c a c b c a a c'.split()
>>> count = '22 2 13 15 4 19 4 21 5 24 18 1'.split()
>>> color = 'r g b b r r r g g  b b g'.split()
>>> data = [delimiter.join(row) for row in zip(group, count, color)]
>>> text.extend(data)
>>> # create a temp file and dump our text
>>> outfile = tempfile.TemporaryFile(mode='w+')
>>> _ = outfile.write('\n'.join(text))
>>> # create a sniffer
>>> sniffer = Sniffer(outfile)
>>> # change the sample amount to 10 lines and skip line 4
>>> # you would know to do this by inspecting the sample property
>>> # and seeing the problematic line 4
>>> sniffer.amount = 10
>>> sniffer.skips = [4]
>>> sniffer.sniff()
>>> print(sniffer.dialect)
SimpleDialect(';', '"', None)
>>> # ask the sniffer to return a Header
>>> header = sniffer.header(poll=4)
>>> print(header)
...
Header(line=3,
names=['group', 'count', 'color'],
string='group;count;color')
>>> # ask sniffer for the metadata given the header
>>> sniffer.metadata(header)
...
MetaData(lines=(0, 3),
string='exp;3\nname;Paul Dirac\ndate;11/09/1942')
>>> # ask for the column types and consistency of types
>>> # by polling the last 4 rows
>>> types, consistent = sniffer.types(poll=4)
>>> print(types)
[<class 'str'>, <class 'int'>, <class 'str'>]
>>> print(consistent)
True
>>> # close the temp outfile resource
>>> outfile.close()
Source code in src/tabbed/sniffing.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
class Sniffer(ReprMixin):
    r"""A tool for inferring the dialect and structure of a CSV file.

    The formatting of CSV files can vary widely. Python's builtin Sniffer is
    capable of handling different dialects (separators, line terminators, quotes
    etc) but assumes the first line within the file is a header or a row of
    unheaded data. In practice, many CSV files contain metadata prior to the
    header or data section. While these files are not compliant with CSV
    standards (RFC-4180), their broad use necessitates file sniffing that infers
    both dialect and structure. To date, some csv readers such as Pandas
    read_csv allow metadata rows to be skipped but no formal mechanism for
    sniffing dialect, metadata and header information exist. This Sniffer
    supports these operations.

    Attributes:
        infile:
            An open file, an IO instance.
        line_count:
            The number of lines in infile.
        start:
            The start line of infile for collecting a sample of 'amount' number
            of lines.
        amount:
            The number of infile lines to sample for dialect, header and
            metadata detection. The initial value defaults to the smaller of
            line_count or 100 lines. The amount should be large enough to
            include some of the data section of the file.
        skips:
            Line numbers to ignore during sample collection.

    Examples:
        >>> import tempfile
        >>> delimiter = ';'
        >>> # make a metadata and add to text that will be written to tempfile
        >>> metadata = {'exp': '3', 'name': 'Paul Dirac', 'date': '11/09/1942'}
        >>> text = [delimiter.join([key, val]) for key, val in metadata.items()]
        >>> # make a header and row to skip and add to text
        >>> header = delimiter.join('group count color'.split())
        >>> to_skip = delimiter.join('please ignore this line'.split())
        >>> text.extend([header, to_skip])
        >>> # make some data rows and add to text
        >>> group = 'a c b b c a c b c a a c'.split()
        >>> count = '22 2 13 15 4 19 4 21 5 24 18 1'.split()
        >>> color = 'r g b b r r r g g  b b g'.split()
        >>> data = [delimiter.join(row) for row in zip(group, count, color)]
        >>> text.extend(data)
        >>> # create a temp file and dump our text
        >>> outfile = tempfile.TemporaryFile(mode='w+')
        >>> _ = outfile.write('\n'.join(text))
        >>> # create a sniffer
        >>> sniffer = Sniffer(outfile)
        >>> # change the sample amount to 10 lines and skip line 4
        >>> # you would know to do this by inspecting the sample property
        >>> # and seeing the problematic line 4
        >>> sniffer.amount = 10
        >>> sniffer.skips = [4]
        >>> sniffer.sniff()
        >>> print(sniffer.dialect)
        SimpleDialect(';', '"', None)
        >>> # ask the sniffer to return a Header
        >>> header = sniffer.header(poll=4)
        >>> print(header)
        ... #doctest: +NORMALIZE_WHITESPACE
        Header(line=3,
        names=['group', 'count', 'color'],
        string='group;count;color')
        >>> # ask sniffer for the metadata given the header
        >>> sniffer.metadata(header)
        ... #doctest: +NORMALIZE_WHITESPACE
        MetaData(lines=(0, 3),
        string='exp;3\nname;Paul Dirac\ndate;11/09/1942')
        >>> # ask for the column types and consistency of types
        >>> # by polling the last 4 rows
        >>> types, consistent = sniffer.types(poll=4)
        >>> print(types)
        [<class 'str'>, <class 'int'>, <class 'str'>]
        >>> print(consistent)
        True
        >>> # close the temp outfile resource
        >>> outfile.close()
    """

    # help users set sane values for the sniffer
    # pylint: disable-next=R0917, dangerous-default-value
    def __init__(
        self,
        infile: IO[str],
        start: int = 0,
        amount: int = 100,
        skips: Optional[List[int]] = None,
        delimiters: List[str] | None = [',', ';', '|', '\t'],
    ) -> None:
        """Initialize this sniffer.

        Args:
            infile:
                A I/O stream instance such as returned by open.
            start:
                The start line of infile for collecting a sample of lines.
            amount:
                The number of infile lines to sample for dialect detection and
                locating header and metadata positions. The initial value defaults
                to the smaller of the infiles length or 100 lines.
            skips:
                Line numbers to ignore during sample collection.
            delimiters:
                A restricted list of delimiter strings for improving dialect
                detection. If None, any character will be considered a valid
                delimiter.

        Raises:
            SoptIteration: is raised if start is greater than infile's size.

        Notes:
            Sniffer deviates from Python's Sniffer in that infile is strictly an
            IO stream, not a list because detecting the metadata and header
            structures requires movement within the file via 'seek'.
        """

        self.infile = infile
        self.infile.seek(0)
        self._start = start
        self._amount = amount
        self._skips = skips if skips else []
        # get sample for infile and sniff
        self._resample()
        self.sniff(delimiters)

    @property
    def start(self) -> int:
        """Returns the start line of this Sniffer's sample."""

        return self._start

    @start.setter
    def start(self, value: int) -> None:
        """Sets the start line & updates this Sniffer's sample

        Args:
            value:
                A new sample start line.
        """

        self._start = value
        self._resample()

    @property
    def amount(self) -> int:
        """Returns the number of lines in Sniffer's sample."""

        return self._amount

    @amount.setter
    def amount(self, value: int) -> None:
        """Sets the number of lines & updates this Sniffer's sample.

        Args:
            value:
                The new number of joined lines in the sample.
        """

        self._amount = value
        self._resample()

    @property
    def skips(self) -> List[int]:
        """Returns the skipped lines excluded from this Sniffer's sample."""

        return self._skips

    @skips.setter
    def skips(self, other: List[int]) -> None:
        """Sets the lines to exclude from this Sniffer's sample."""

        self._skips = other
        self._resample()

    @property
    def sample(self) -> str:
        """Returns this Sniffer's sample string."""

        return self._sample

    @property
    def lines(self) -> List[int]:
        """Returns a list of integer line numbers comprising the sample."""

        return self._lines

    @property
    def dialect(self) -> SimpleDialect | None:
        """Returns this Sniffer's dialect."""

        return self._dialect

    @dialect.setter
    def dialect(self, value: SimpleDialect | None) -> None:
        """Sets this Sniffer's dialect.

        Args:
            dialect:
                A clevercsv SimpleDialect instance containing a delimiter,
                escape character and quote character.

        Returns:
            None
        """

        if value:
            # python 3.11 deprecated '' for delimiter, escape & quotechars
            delimiter = '\r' if value.delimiter == '' else value.delimiter
            escapechar = None if value.escapechar == '' else value.escapechar
            quotechar = '"' if not value.quotechar else value.quotechar
            value.delimiter = delimiter
            value.escapechar = escapechar
            value.quotechar = quotechar

        self._dialect = value

    @property
    def rows(self) -> List[List[str]]:
        """Returns list of sample rows from this Sniffer's sample string.

        This method splits the sample string on new line chars, strips white
        spaces and replaces all double-quotes with single quotes.

        Returns:
            A list of list of strings from the sample string
        """

        if self.dialect is None:
            msg = "Dialect is unknown, please call sniff method or set dialect."
            raise TypeError(msg)

        result = []
        delimiter = self.dialect.delimiter

        # single column data uses carriage return delimiter
        if delimiter == '\r':
            return [
                [astr.replace('"', '')] for astr in self.sample.splitlines()
            ]

        # split sample_str on terminators, strip & split each line on delimiter
        for line in self.sample.splitlines():
            # lines may end in delimiter leading to empty trailing cells
            stripped = line.rstrip(delimiter)
            row = stripped.split(self.dialect.delimiter)
            # remove any double quotes
            row = [astring.replace('"', '') for astring in row]
            result.append(row)

        return result

    def _move(self, line: int) -> None:
        """Moves the line pointer in this file to line number.

        Args:
            line:
                A line number to move to within this Sniffer's infile.

        Returns:
            None but advances the line pointer to line.

        Raises:
            A StopIteration is issued if line is greater than Sniffer's infile
            size.
        """

        self.infile.seek(0)
        for _ in range(line):
            next(self.infile)

    def _resample(self) -> None:
        """Sample from infile using the start, amount and skip properties."""

        self._move(self.start)
        result = SimpleNamespace(indices=[], linestrs=[])
        amount = self.amount + len(self.skips)
        for current in range(self.start, amount + self.start):

            line = self.infile.readline()
            # only store non-blank lines
            if current not in self.skips and line:
                result.linestrs.append(line)
                result.indices.append(current)

        # move line pointer back to start of the file
        self._move(0)
        sampled = ''.join(result.linestrs)
        self._sample: str = sampled
        self._lines: List[int] = result.indices

    def sniff(self, delimiters: Optional[List[str]] = None) -> None:
        """Returns a clevercsv SimpleDialect from this instances sample.

        Dialect is detected using clevercsv's sniffer as it has shown improved
        dialect detection accuracy over Python's csv sniffer built-in.

        Args:
            delimiters:
                A string of possibly valid delimiters see csv.Sniffer.sniff.

        Returns:
            A SimpleDialect instance (see clevercsv.dialect) or None if sniffing
            is inconclusive.

        References:
            van den Burg, G.J.J., Nazábal, A. & Sutton, C. Wrangling messy CSV
            files by detecting row and type patterns. Data Min Knowl Disc 33,
            1799–1820 (2019). https://doi.org/10.1007/s10618-019-00646-y
        """

        # result is None if clevercsv's sniff is indeterminant
        result = clevercsv.Sniffer().sniff(self.sample, delimiters=delimiters)
        if result is None:
            msg1 = "Dialect could not be determined from Sniffer's sample.  "
            msg2 = "Please set this Sniffer's dialect attribute."
            warnings.warn(msg1 + msg2)
            self._dialect = None
        else:
            self.dialect = result

    # no mutation of exclude list here
    # pylint: disable-next=dangerous-default-value
    def types(
        self,
        poll: int,
        exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
    ) -> Tuple[CellTypes, bool]:
        """Infer the column types from the last poll count rows.

        Args:
            poll:
                The number of last sample rows to poll for type.
            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored for type determination.

        Returns:
            A list of types and a boolean indicating if types are
            consistent across polled rows.
        """

        rows = self.rows[-poll:]
        rows = [row for row in rows if not bool(set(exclude).intersection(row))]
        if not rows:
            msg = (
                f'Types could not be determined as last {poll} polling '
                f'rows all contained at least one exclusion {exclude}. Try '
                'increasing the number of polling rows.'
            )
            raise RuntimeError(msg)

        cols = list(zip(*rows))
        type_cnts = [
            Counter([type(parsing.convert(el)) for el in col]) for col in cols
        ]
        consistent = all(len(cnt) == 1 for cnt in type_cnts)
        common_types = [cnt.most_common(1)[0][0] for cnt in type_cnts]

        return common_types, consistent

    # no mutation of exclude list here
    # pylint: disable-next=dangerous-default-value
    def datetime_formats(
        self,
        poll: int,
        exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
    ) -> Tuple[List[str | None], bool]:
        """Infer time, date or datetime formats from last poll count rows.

        Args:
            poll:
                The number of last sample rows to poll for type and format
                consistency.

        Returns:
            A tuple containing a list of formats the same length as last polled
            row and a boolean indicating if the formats are consistent across
            the polled rows. Columns that are not time, date or datetime type
            have a format of None.
        """

        fmts = {
            time: parsing.time_formats(),
            date: parsing.date_formats(),
            datetime: parsing.datetime_formats(),
        }
        polled = []
        for row in self.rows[-poll:]:
            row_fmts = []
            for astring, tp in zip(row, self.types(poll, exclude)[0]):
                fmt = (
                    parsing.find_format(astring, fmts[tp])
                    if tp in fmts
                    else None
                )
                row_fmts.append(fmt)
            polled.append(row_fmts)

        # consistency within each column of polled
        consistent = all(len(set(col)) == 1 for col in list(zip(*polled)))

        return polled[-1], consistent

    def _length_diff(
        self,
        poll: int,
        exclude: List[str],
    ) -> Tuple[int | None, List[str] | None]:
        """Locates metadata by identifying the first row from the end of the
        sample whose length does not match the length of the last poll rows.

        This method assumes that the metadata row lengths do not match the data
        row lengths. This can obviously be untrue but detecting the difference
        between a header row whose length must match the number of data columns
        from a metadata row with the same number of columns is challenging.

        Args:
            poll:
                The number of last sample rows to poll for common types.
            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored.

        Returns:
            A 2-tuple of integer line number and the metadata row if found and
            a 2-tuple of Nones otherwise.
        """

        types, _ = self.types(poll, exclude)
        for idx, row in reversed(list(zip(self.lines, self.rows))):

            if len(row) != len(types):
                return idx, row

        return None, None

    def _type_diff(
        self,
        poll: int,
        exclude: List[str],
    ) -> Tuple[int | None, List[str] | None]:
        """Locates a header row by looking for the first row from the last of
        this Sniffer's rows whose types do not match the last polled row types.

        This heuristic assumes a consistent type within a column of data. If
        this is found to be untrue it returns a two-tuple of Nones.

        Args:
            poll:
                The number of last sample rows to poll for common types.
            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored.

        Returns:
            A 2-tuple integer line number and header row or a 2-tuple of Nones.
        """

        types, consistent = self.types(poll, exclude)
        # if polled types are inconsistent type_diff will fail.
        if not consistent:
            return None, None

        # int, float and complex mismatches are not type mismatches
        numerics = {int, float, complex}
        for idx, row in reversed(list(zip(self.lines, self.rows))):

            # ignore rows that have missing values
            if bool(set(exclude).intersection(row)):
                continue

            if len(row) != len(types):
                # we've encountered a metadata row without hitting a header
                return None, None

            row_types = [type(parsing.convert(el)) for el in row]
            # check types
            for typ, expect in zip(row_types, types):
                if typ != expect and not {typ, expect}.issubset(numerics):
                    return idx, row

        return None, None

    def _string_diff(
        self,
        poll: int,
        exclude: List[str],
        len_requirement: bool = True,
    ) -> Tuple[int | None, List[str] | None]:
        """Locates first row from last whose strings have no overlap with
        strings in the last poll rows.

        Args:
            poll:
                The number of last sample rows to poll for string values.

            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored.
            len_requirement:
                A boolean indicating if the first row from last with a type
                mismatch must have the same length as the last row of the
                sample. This will be True for headers and False for metadata.

        Returns:
            An integer line number and header row or a 2-tuple of Nones
        """

        observed = set(chain.from_iterable(self.rows[-poll:]))
        for idx, row in reversed(list(zip(self.lines, self.rows))):

            items = set(row)
            # ignore rows with missing values
            if bool(set(exclude).intersection(items)):
                continue

            # check disjoint with observed and completeness
            disjoint = items.isdisjoint(observed)
            complete = len(row) == len(self.rows[-1])

            if not len_requirement:
                # complete is always True if no length requirement
                complete = True

            if disjoint and complete:
                return idx, row

            # add unseen items to observed
            observed.update(items)

        return None, None

    # no mutation of exclude list here
    # pylint: disable-next=dangerous-default-value
    def header(
        self,
        poll: int,
        exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
    ) -> Header:
        """Detects the header row (if any) from this Sniffers sample rows.

        Headers are located using one of two possible methods.
            1. If the last row contains mixed types and the last poll rows have
               consistent types, then the first row from the last whose types
               differ from the last row types and whose length matches the last
               row is taken as the header.
            2. If the last poll rows are all string type. The first row from the
               last with string values that have never been seen in the previous
               rows and whose length matches the last row is taken to be the
               header. Caution, the poll amount should be sufficiently large
               enough to sample the possible string values expected in the data
               section. If the header is not correct, consider increasing the
               poll rows parameter.

        Args:
            poll:
                The number of last sample rows to poll for locating the header
                using string or type differences. Poll should be large enough to
                capture many of the string values that appear in the data
                section.
            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored.

        Notes:
            If no header is detected this method constructs a header. The names
            in this header are of the form; 'Column_1', ... 'Column_n' where
            n is the expected number of columns from the last row of the sample
            rows.  Just like all other file sniffers, this heuristic will make
            mistakes.  A judicious sample choice that ignores problematic rows
            via the skip parameter may aide detection.

        Returns:
            A Header dataclass instance.
        """

        types, _ = self.types(poll, exclude)
        if all(typ == str for typ in types):
            line, row = self._string_diff(poll, exclude)

        else:
            line, row = self._type_diff(poll, exclude)

        if line is None:
            row = [f'Column_{i}' for i in range(len(self.rows[-1]))]

        # type-narrow for mypy check-- row can no longer be None
        assert isinstance(row, list)
        # get original string if line
        if line is not None:
            # string should include the rows we skipped so use sample not rows
            s = self.sample.splitlines()[self.lines.index(line)]
        else:
            s = None

        return Header(line=line, names=row, string=s)

    # no mutation of exclude list here
    # pylint: disable-next=dangerous-default-value
    def metadata(
        self,
        header: Header | None,
        poll: Optional[int] = None,
        exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
    ) -> MetaData:
        """Detects the metadata section (if any) in this Sniffer's sample.

        Args:
            header:
                A Header dataclass instance.
            poll:
                The number of last sample rows to poll for locating metadata by
                length differences if the header arg is None.
            exclude:
                A sequence of characters that indicate missing values. Rows
                containing these strings will be ignored during metadata
                detection. This is ignored if a header is given.

        Returns:
            A MetaData dataclass instance.
        """

        # if header provided get lines upto header line
        if header and header.line:
            idx = self.lines.index(header.line)
            s = '\n'.join(self.sample.splitlines()[0:idx])
            return MetaData((0, header.line), s)

        if not header and poll is None:
            msg = 'Arguments header and poll cannot both be None type'
            raise ValueError(msg)

        # type narrow poll to int type for mypy
        assert isinstance(poll, int)
        line, _ = self._length_diff(poll, exclude)
        if line is not None:
            metarows = self.sample.splitlines()[: line + 1]
            string = '\n'.join(metarows)
            return MetaData((0, line + 1), string)

        return MetaData((0, None), None)

start property writable

Returns the start line of this Sniffer's sample.

amount property writable

Returns the number of lines in Sniffer's sample.

skips property writable

Returns the skipped lines excluded from this Sniffer's sample.

lines property

Returns a list of integer line numbers comprising the sample.

dialect property writable

Returns this Sniffer's dialect.

rows property

Returns list of sample rows from this Sniffer's sample string.

This method splits the sample string on new line chars, strips white spaces and replaces all double-quotes with single quotes.

Returns:

Type Description
List[List[str]]

A list of list of strings from the sample string

__init__(infile, start=0, amount=100, skips=None, delimiters=[',', ';', '|', '\t'])

Initialize this sniffer.

Parameters:

Name Type Description Default
infile IO[str]

A I/O stream instance such as returned by open.

required
start int

The start line of infile for collecting a sample of lines.

0
amount int

The number of infile lines to sample for dialect detection and locating header and metadata positions. The initial value defaults to the smaller of the infiles length or 100 lines.

100
skips Optional[List[int]]

Line numbers to ignore during sample collection.

None
delimiters List[str] | None

A restricted list of delimiter strings for improving dialect detection. If None, any character will be considered a valid delimiter.

[',', ';', '|', '\t']

Raises:

Type Description
SoptIteration

is raised if start is greater than infile's size.

Notes

Sniffer deviates from Python's Sniffer in that infile is strictly an IO stream, not a list because detecting the metadata and header structures requires movement within the file via 'seek'.

Source code in src/tabbed/sniffing.py
def __init__(
    self,
    infile: IO[str],
    start: int = 0,
    amount: int = 100,
    skips: Optional[List[int]] = None,
    delimiters: List[str] | None = [',', ';', '|', '\t'],
) -> None:
    """Initialize this sniffer.

    Args:
        infile:
            A I/O stream instance such as returned by open.
        start:
            The start line of infile for collecting a sample of lines.
        amount:
            The number of infile lines to sample for dialect detection and
            locating header and metadata positions. The initial value defaults
            to the smaller of the infiles length or 100 lines.
        skips:
            Line numbers to ignore during sample collection.
        delimiters:
            A restricted list of delimiter strings for improving dialect
            detection. If None, any character will be considered a valid
            delimiter.

    Raises:
        SoptIteration: is raised if start is greater than infile's size.

    Notes:
        Sniffer deviates from Python's Sniffer in that infile is strictly an
        IO stream, not a list because detecting the metadata and header
        structures requires movement within the file via 'seek'.
    """

    self.infile = infile
    self.infile.seek(0)
    self._start = start
    self._amount = amount
    self._skips = skips if skips else []
    # get sample for infile and sniff
    self._resample()
    self.sniff(delimiters)

sniff(delimiters=None)

Returns a clevercsv SimpleDialect from this instances sample.

Dialect is detected using clevercsv's sniffer as it has shown improved dialect detection accuracy over Python's csv sniffer built-in.

Parameters:

Name Type Description Default
delimiters Optional[List[str]]

A string of possibly valid delimiters see csv.Sniffer.sniff.

None

Returns:

Type Description
None

A SimpleDialect instance (see clevercsv.dialect) or None if sniffing

None

is inconclusive.

References

van den Burg, G.J.J., Nazábal, A. & Sutton, C. Wrangling messy CSV files by detecting row and type patterns. Data Min Knowl Disc 33, 1799–1820 (2019). https://doi.org/10.1007/s10618-019-00646-y

Source code in src/tabbed/sniffing.py
def sniff(self, delimiters: Optional[List[str]] = None) -> None:
    """Returns a clevercsv SimpleDialect from this instances sample.

    Dialect is detected using clevercsv's sniffer as it has shown improved
    dialect detection accuracy over Python's csv sniffer built-in.

    Args:
        delimiters:
            A string of possibly valid delimiters see csv.Sniffer.sniff.

    Returns:
        A SimpleDialect instance (see clevercsv.dialect) or None if sniffing
        is inconclusive.

    References:
        van den Burg, G.J.J., Nazábal, A. & Sutton, C. Wrangling messy CSV
        files by detecting row and type patterns. Data Min Knowl Disc 33,
        1799–1820 (2019). https://doi.org/10.1007/s10618-019-00646-y
    """

    # result is None if clevercsv's sniff is indeterminant
    result = clevercsv.Sniffer().sniff(self.sample, delimiters=delimiters)
    if result is None:
        msg1 = "Dialect could not be determined from Sniffer's sample.  "
        msg2 = "Please set this Sniffer's dialect attribute."
        warnings.warn(msg1 + msg2)
        self._dialect = None
    else:
        self.dialect = result

types(poll, exclude=['', ' ', '-', 'nan', 'NaN', 'NAN'])

Infer the column types from the last poll count rows.

Parameters:

Name Type Description Default
poll int

The number of last sample rows to poll for type.

required
exclude List[str]

A sequence of characters that indicate missing values. Rows containing these strings will be ignored for type determination.

['', ' ', '-', 'nan', 'NaN', 'NAN']

Returns:

Type Description
CellTypes

A list of types and a boolean indicating if types are

bool

consistent across polled rows.

Source code in src/tabbed/sniffing.py
def types(
    self,
    poll: int,
    exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
) -> Tuple[CellTypes, bool]:
    """Infer the column types from the last poll count rows.

    Args:
        poll:
            The number of last sample rows to poll for type.
        exclude:
            A sequence of characters that indicate missing values. Rows
            containing these strings will be ignored for type determination.

    Returns:
        A list of types and a boolean indicating if types are
        consistent across polled rows.
    """

    rows = self.rows[-poll:]
    rows = [row for row in rows if not bool(set(exclude).intersection(row))]
    if not rows:
        msg = (
            f'Types could not be determined as last {poll} polling '
            f'rows all contained at least one exclusion {exclude}. Try '
            'increasing the number of polling rows.'
        )
        raise RuntimeError(msg)

    cols = list(zip(*rows))
    type_cnts = [
        Counter([type(parsing.convert(el)) for el in col]) for col in cols
    ]
    consistent = all(len(cnt) == 1 for cnt in type_cnts)
    common_types = [cnt.most_common(1)[0][0] for cnt in type_cnts]

    return common_types, consistent

datetime_formats(poll, exclude=['', ' ', '-', 'nan', 'NaN', 'NAN'])

Infer time, date or datetime formats from last poll count rows.

Parameters:

Name Type Description Default
poll int

The number of last sample rows to poll for type and format consistency.

required

Returns:

Type Description
List[str | None]

A tuple containing a list of formats the same length as last polled

bool

row and a boolean indicating if the formats are consistent across

Tuple[List[str | None], bool]

the polled rows. Columns that are not time, date or datetime type

Tuple[List[str | None], bool]

have a format of None.

Source code in src/tabbed/sniffing.py
def datetime_formats(
    self,
    poll: int,
    exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
) -> Tuple[List[str | None], bool]:
    """Infer time, date or datetime formats from last poll count rows.

    Args:
        poll:
            The number of last sample rows to poll for type and format
            consistency.

    Returns:
        A tuple containing a list of formats the same length as last polled
        row and a boolean indicating if the formats are consistent across
        the polled rows. Columns that are not time, date or datetime type
        have a format of None.
    """

    fmts = {
        time: parsing.time_formats(),
        date: parsing.date_formats(),
        datetime: parsing.datetime_formats(),
    }
    polled = []
    for row in self.rows[-poll:]:
        row_fmts = []
        for astring, tp in zip(row, self.types(poll, exclude)[0]):
            fmt = (
                parsing.find_format(astring, fmts[tp])
                if tp in fmts
                else None
            )
            row_fmts.append(fmt)
        polled.append(row_fmts)

    # consistency within each column of polled
    consistent = all(len(set(col)) == 1 for col in list(zip(*polled)))

    return polled[-1], consistent

header(poll, exclude=['', ' ', '-', 'nan', 'NaN', 'NAN'])

Detects the header row (if any) from this Sniffers sample rows.

Headers are located using one of two possible methods. 1. If the last row contains mixed types and the last poll rows have consistent types, then the first row from the last whose types differ from the last row types and whose length matches the last row is taken as the header. 2. If the last poll rows are all string type. The first row from the last with string values that have never been seen in the previous rows and whose length matches the last row is taken to be the header. Caution, the poll amount should be sufficiently large enough to sample the possible string values expected in the data section. If the header is not correct, consider increasing the poll rows parameter.

Parameters:

Name Type Description Default
poll int

The number of last sample rows to poll for locating the header using string or type differences. Poll should be large enough to capture many of the string values that appear in the data section.

required
exclude List[str]

A sequence of characters that indicate missing values. Rows containing these strings will be ignored.

['', ' ', '-', 'nan', 'NaN', 'NAN']
Notes

If no header is detected this method constructs a header. The names in this header are of the form; 'Column_1', ... 'Column_n' where n is the expected number of columns from the last row of the sample rows. Just like all other file sniffers, this heuristic will make mistakes. A judicious sample choice that ignores problematic rows via the skip parameter may aide detection.

Returns:

Type Description
Header

A Header dataclass instance.

Source code in src/tabbed/sniffing.py
def header(
    self,
    poll: int,
    exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
) -> Header:
    """Detects the header row (if any) from this Sniffers sample rows.

    Headers are located using one of two possible methods.
        1. If the last row contains mixed types and the last poll rows have
           consistent types, then the first row from the last whose types
           differ from the last row types and whose length matches the last
           row is taken as the header.
        2. If the last poll rows are all string type. The first row from the
           last with string values that have never been seen in the previous
           rows and whose length matches the last row is taken to be the
           header. Caution, the poll amount should be sufficiently large
           enough to sample the possible string values expected in the data
           section. If the header is not correct, consider increasing the
           poll rows parameter.

    Args:
        poll:
            The number of last sample rows to poll for locating the header
            using string or type differences. Poll should be large enough to
            capture many of the string values that appear in the data
            section.
        exclude:
            A sequence of characters that indicate missing values. Rows
            containing these strings will be ignored.

    Notes:
        If no header is detected this method constructs a header. The names
        in this header are of the form; 'Column_1', ... 'Column_n' where
        n is the expected number of columns from the last row of the sample
        rows.  Just like all other file sniffers, this heuristic will make
        mistakes.  A judicious sample choice that ignores problematic rows
        via the skip parameter may aide detection.

    Returns:
        A Header dataclass instance.
    """

    types, _ = self.types(poll, exclude)
    if all(typ == str for typ in types):
        line, row = self._string_diff(poll, exclude)

    else:
        line, row = self._type_diff(poll, exclude)

    if line is None:
        row = [f'Column_{i}' for i in range(len(self.rows[-1]))]

    # type-narrow for mypy check-- row can no longer be None
    assert isinstance(row, list)
    # get original string if line
    if line is not None:
        # string should include the rows we skipped so use sample not rows
        s = self.sample.splitlines()[self.lines.index(line)]
    else:
        s = None

    return Header(line=line, names=row, string=s)

metadata(header, poll=None, exclude=['', ' ', '-', 'nan', 'NaN', 'NAN'])

Detects the metadata section (if any) in this Sniffer's sample.

Parameters:

Name Type Description Default
header Header | None

A Header dataclass instance.

required
poll Optional[int]

The number of last sample rows to poll for locating metadata by length differences if the header arg is None.

None
exclude List[str]

A sequence of characters that indicate missing values. Rows containing these strings will be ignored during metadata detection. This is ignored if a header is given.

['', ' ', '-', 'nan', 'NaN', 'NAN']

Returns:

Type Description
MetaData

A MetaData dataclass instance.

Source code in src/tabbed/sniffing.py
def metadata(
    self,
    header: Header | None,
    poll: Optional[int] = None,
    exclude: List[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
) -> MetaData:
    """Detects the metadata section (if any) in this Sniffer's sample.

    Args:
        header:
            A Header dataclass instance.
        poll:
            The number of last sample rows to poll for locating metadata by
            length differences if the header arg is None.
        exclude:
            A sequence of characters that indicate missing values. Rows
            containing these strings will be ignored during metadata
            detection. This is ignored if a header is given.

    Returns:
        A MetaData dataclass instance.
    """

    # if header provided get lines upto header line
    if header and header.line:
        idx = self.lines.index(header.line)
        s = '\n'.join(self.sample.splitlines()[0:idx])
        return MetaData((0, header.line), s)

    if not header and poll is None:
        msg = 'Arguments header and poll cannot both be None type'
        raise ValueError(msg)

    # type narrow poll to int type for mypy
    assert isinstance(poll, int)
    line, _ = self._length_diff(poll, exclude)
    if line is not None:
        metarows = self.sample.splitlines()[: line + 1]
        string = '\n'.join(metarows)
        return MetaData((0, line + 1), string)

    return MetaData((0, None), None)

tabbed.sniffing.Header dataclass

An immutable dataclass representation of a text file's header.

Attributes:

Name Type Description
line int | None

The integer line number of this Header. If None, the header was not derived from a file.

names List[str]

The string names of each of the columns comprising the header. If these names contain spaces or repeat, this representation automatically amends them.

string str | None

The original string that was split to create header names. If None, the names were not derived from a file.

Source code in src/tabbed/sniffing.py
@dataclass(frozen=True)
class Header:
    """An immutable dataclass representation of a text file's header.

    Attributes:
        line:
            The integer line number of this Header. If None, the header was not
            derived from a file.
        names:
            The string names of each of the columns comprising the header. If
            these names contain spaces or repeat, this representation
            automatically amends them.
        string:
            The original string that was split to create header names.  If None,
            the names were not derived from a file.
    """

    line: int | None
    names: List[str]
    string: str | None

    def __post_init__(self) -> None:
        """Amend the names during initialization."""

        # relabel the names to replace spaces, repeats etc.
        names = self._amend()
        super().__setattr__('names', names)

    def _amend(self):
        """Ensures header names have no spaces and are unique.

        Header names may not have spaces. This function replaces spaces with
        underscores. Header names must be unique. This function adds an
        underscore plus an integer to names that repeat.
        """

        # replace any blank chars with underscores
        names = [name.strip().replace(' ', '_') for name in self.names]

        # replace repeating names with name_i variants for i in [0, inf)
        counted = Counter(names)
        mapping = {
            name: (
                [name] if cnt < 2 else [name + '_' + str(v) for v in range(cnt)]
            )
            for name, cnt in counted.items()
        }

        result = [mapping[name].pop(0) for name in names]
        return result

__post_init__()

Amend the names during initialization.

Source code in src/tabbed/sniffing.py
def __post_init__(self) -> None:
    """Amend the names during initialization."""

    # relabel the names to replace spaces, repeats etc.
    names = self._amend()
    super().__setattr__('names', names)

_amend()

Ensures header names have no spaces and are unique.

Header names may not have spaces. This function replaces spaces with underscores. Header names must be unique. This function adds an underscore plus an integer to names that repeat.

Source code in src/tabbed/sniffing.py
def _amend(self):
    """Ensures header names have no spaces and are unique.

    Header names may not have spaces. This function replaces spaces with
    underscores. Header names must be unique. This function adds an
    underscore plus an integer to names that repeat.
    """

    # replace any blank chars with underscores
    names = [name.strip().replace(' ', '_') for name in self.names]

    # replace repeating names with name_i variants for i in [0, inf)
    counted = Counter(names)
    mapping = {
        name: (
            [name] if cnt < 2 else [name + '_' + str(v) for v in range(cnt)]
        )
        for name, cnt in counted.items()
    }

    result = [mapping[name].pop(0) for name in names]
    return result

tabbed.sniffing.MetaData dataclass

An immutable dataclass representing a text file's metadata section.

Attributes:

Name Type Description
lines Tuple[int, int | None]

A 2-tuple of start and stop of file lines containing metadata. If None, the file does not contain a metadata section.

string str | None

The string of metadata with no conversion read from file instance. If None, the file does not contain a metadata section.

Source code in src/tabbed/sniffing.py
@dataclass(frozen=True)
class MetaData:
    """An immutable dataclass representing a text file's metadata section.

    Attributes:
        lines:
            A 2-tuple of start and stop of file lines containing metadata. If
            None, the file does not contain a metadata section.
        string:
            The string of metadata with no conversion read from file instance.
            If None, the file does not contain a metadata section.
    """

    lines: Tuple[int, int | None]
    string: str | None