1 | use std::cmp; |
2 | |
3 | use memchr::memchr2_iter; |
4 | use polars_buffer::Buffer; |
5 | use polars_core::POOL; |
6 | use polars_core::prelude::*; |
7 | use polars_error::feature_gated; |
8 | use polars_utils::mmap::MMapSemaphore; |
9 | use polars_utils::pl_path::PlRefPath; |
10 | use polars_utils::select::select_unpredictable; |
11 | use rayon::prelude::*; |
12 | |
13 | use super::CsvParseOptions; |
14 | use super::builder::Builder; |
15 | use super::options::{CommentPrefix, NullValuesCompiled}; |
16 | use super::splitfields::SplitFields; |
17 | use crate::csv::read::read_until_start_and_infer_schema; |
18 | use crate::prelude::CsvReadOptions; |
19 | use crate::utils::compression::CompressedReader; |
20 | |
21 | /// Read the number of rows without parsing columns |
22 | /// useful for count(*) queries |
23 | #[allow(clippy::too_many_arguments)] |
24 | pub fn count_rows( |
25 | path: PlRefPath, |
26 | quote_char: Option<u8>, |
27 | comment_prefix: Option<&CommentPrefix>, |
28 | eol_char: u8, |
29 | has_header: bool, |
30 | skip_lines: usize, |
31 | skip_rows_before_header: usize, |
32 | skip_rows_after_header: usize, |
33 | ) -> PolarsResult<usize> { |
34 | let file = if path.has_scheme() || polars_config::config().force_async() { |
35 | feature_gated!("cloud", { |
36 | crate::file_cache::FILE_CACHE |
37 | .get_entry(path) |
38 | // Safety: This was initialized by schema inference. |
39 | .unwrap() |
40 | .try_open_assume_latest()? |
41 | }) |
42 | } else { |
43 | polars_utils::open_file(path.as_std_path())? |
44 | }; |
45 | |
46 | let mmap = MMapSemaphore::new_from_file(&file).unwrap(); |
47 | |
48 | count_rows_from_slice_par( |
49 | Buffer::from_owner(mmap), |
50 | quote_char, |
51 | comment_prefix, |
52 | eol_char, |
53 | has_header, |
54 | skip_lines, |
55 | skip_rows_before_header, |
56 | skip_rows_after_header, |
57 | ) |
58 | } |
59 | |
60 | /// Read the number of rows without parsing columns |
61 | /// useful for count(*) queries |
62 | #[allow(clippy::too_many_arguments)] |
63 | pub fn count_rows_from_slice_par( |
64 | buffer: Buffer<u8>, |
65 | quote_char: Option<u8>, |
66 | comment_prefix: Option<&CommentPrefix>, |
67 | eol_char: u8, |
68 | has_header: bool, |
69 | skip_lines: usize, |
70 | skip_rows_before_header: usize, |
71 | skip_rows_after_header: usize, |
72 | ) -> PolarsResult<usize> { |
73 | let mut reader = CompressedReader::try_new(buffer)?; |
74 | |
75 | let reader_options = CsvReadOptions { |
76 | parse_options: Arc::new(CsvParseOptions { |
77 | quote_char, |
78 | comment_prefix: comment_prefix.cloned(), |
79 | eol_char, |
80 | ..Default::default() |
81 | }), |
82 | has_header, |
83 | skip_lines, |
84 | skip_rows: skip_rows_before_header, |
85 | skip_rows_after_header, |
86 | ..Default::default() |
87 | }; |
88 | |
89 | let (_, mut leftover) = |
90 | read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?; |
91 | |
92 | const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) { |
93 | 128 |
94 | } else { |
95 | 512 * 1024 |
96 | }; |
97 | |
98 | let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned()); |
99 | POOL.install(|| { |
100 | let mut states = Vec::new(); |
101 | let eof_unterminated_row; |
102 | |
103 | if comment_prefix.is_none() { |
104 | let mut last_slice = Buffer::new(); |
105 | let mut err = None; |
106 | |
107 | let streaming_iter = std::iter::from_fn(|| { |
108 | let (slice, read_n) = match reader.read_next_slice(&leftover, BYTES_PER_CHUNK) { |
109 | Ok(tup) => tup, |
110 | Err(e) => { |
111 | err = Some(e); |
112 | return None; |
113 | }, |
114 | }; |
115 | |
116 | leftover = Buffer::new(); |
117 | if slice.is_empty() && read_n == 0 { |
118 | return None; |
119 | } |
120 | |
121 | last_slice = slice.clone(); |
122 | Some(slice) |
123 | }); |
124 | |
125 | states = streaming_iter |
126 | .enumerate() |
127 | .par_bridge() |
128 | .map(|(id, slice)| (count.analyze_chunk(&slice), id)) |
129 | .collect::<Vec<_>>(); |
130 | |
131 | if let Some(e) = err { |
132 | return Err(e.into()); |
133 | } |
134 | |
135 | // par_bridge does not guarantee order, but is mostly sorted so `slice::sort` is a |
136 | // decent fit. |
137 | states.sort_by_key(|(_, id)| *id); |
138 | |
139 | // Technically this is broken if the input has a comment line at the end that is longer |
140 | // than `BYTES_PER_CHUNK`, but in practice this ought to be fine. |
141 | eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix); |
142 | } else { |
143 | // For the non-compressed case this is a zero-copy op. |
144 | // TODO: Implement streaming chunk logic. |
145 | let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?; |
146 | |
147 | let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK); |
148 | (0..num_chunks) |
149 | .into_par_iter() |
150 | .map(|chunk_idx| { |
151 | let mut start_offset = chunk_idx * BYTES_PER_CHUNK; |
152 | let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len()); |
153 | |
154 | if start_offset != 0 { |
155 | // Ensure we start at the start of a line. |
156 | if let Some(nl_off) = bytes[start_offset..next_start_offset] |
157 | .iter() |
158 | .position(|b| *b == eol_char) |
159 | { |
160 | start_offset += nl_off + 1; |
161 | } else { |
162 | return (count.analyze_chunk(&[]), 0); |
163 | } |
164 | } |
165 | |
166 | let stop_offset = if let Some(nl_off) = bytes[next_start_offset..] |
167 | .iter() |
168 | .position(|b| *b == eol_char) |
169 | { |
170 | next_start_offset + nl_off + 1 |
171 | } else { |
172 | bytes.len() |
173 | }; |
174 | |
175 | (count.analyze_chunk(&bytes[start_offset..stop_offset]), 0) |
176 | }) |
177 | .collect_into_vec(&mut states); |
178 | |
179 | eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix); |
180 | } |
181 | |
182 | let mut n = 0; |
183 | let mut in_string = false; |
184 | for (pair, _) in states { |
185 | n += pair[in_string as usize].newline_count; |
186 | in_string = pair[in_string as usize].end_inside_string; |
187 | } |
188 | n += eof_unterminated_row as usize; |
189 | |
190 | Ok(n) |
191 | }) |
192 | } |
193 | |
194 | /// Checks if a line in a CSV file is a comment based on the given comment prefix configuration. |
195 | /// |
196 | /// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters. |
197 | #[inline] |
198 | pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool { |
199 | match comment_prefix { |
200 | Some(CommentPrefix::Single(c)) => line.first() == Some(c), |
201 | Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()), |
202 | None => false, |
203 | } |
204 | } |
205 | |
206 | /// Find the nearest next line position. |
207 | /// Does not check for new line characters embedded in String fields. |
208 | pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> { |
209 | let pos = memchr::memchr(eol_char, input)? + 1; |
210 | if input.len() - pos == 0 { |
211 | return None; |
212 | } |
213 | Some(pos) |
214 | } |
215 | |
216 | /// Find the nearest next line position that is not embedded in a String field. |
217 | pub(super) fn next_line_position( |
218 | mut input: &[u8], |
219 | mut expected_fields: Option<usize>, |
220 | separator: u8, |
221 | quote_char: Option<u8>, |
222 | eol_char: u8, |
223 | ) -> Option<usize> { |
224 | fn accept_line( |
225 | line: &[u8], |
226 | expected_fields: usize, |
227 | separator: u8, |
228 | eol_char: u8, |
229 | quote_char: Option<u8>, |
230 | ) -> bool { |
231 | let mut count = 0usize; |
232 | for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) { |
233 | if memchr2_iter(separator, eol_char, field).count() >= expected_fields { |
234 | return false; |
235 | } |
236 | count += 1; |
237 | } |
238 | |
239 | // if the latest field is missing |
240 | // e.g.: |
241 | // a,b,c |
242 | // vala,valb, |
243 | // SplitFields returns a count that is 1 less |
244 | // There fore we accept: |
245 | // expected == count |
246 | // and |
247 | // expected == count - 1 |
248 | expected_fields.wrapping_sub(count) <= 1 |
249 | } |
250 | |
251 | // we check 3 subsequent lines for `accept_line` before we accept |
252 | // if 3 groups are rejected we reject completely |
253 | let mut rejected_line_groups = 0u8; |
254 | |
255 | let mut total_pos = 0; |
256 | if input.is_empty() { |
257 | return None; |
258 | } |
259 | let mut lines_checked = 0u8; |
260 | loop { |
261 | if rejected_line_groups >= 3 { |
262 | return None; |
263 | } |
264 | lines_checked = lines_checked.wrapping_add(1); |
265 | // headers might have an extra value |
266 | // So if we have churned through enough lines |
267 | // we try one field less. |
268 | if lines_checked == u8::MAX { |
269 | if let Some(ef) = expected_fields { |
270 | expected_fields = Some(ef.saturating_sub(1)) |
271 | } |
272 | }; |
273 | let pos = memchr::memchr(eol_char, input)? + 1; |
274 | if input.len() - pos == 0 { |
275 | return None; |
276 | } |
277 | debug_assert!(pos <= input.len()); |
278 | let new_input = unsafe { input.get_unchecked(pos..) }; |
279 | let mut lines = SplitLines::new(new_input, quote_char, eol_char, None); |
280 | let line = lines.next(); |
281 | |
282 | match (line, expected_fields) { |
283 | // count the fields, and determine if they are equal to what we expect from the schema |
284 | (Some(line), Some(expected_fields)) => { |
285 | if accept_line(line, expected_fields, separator, eol_char, quote_char) { |
286 | let mut valid = true; |
287 | for line in lines.take(2) { |
288 | if !accept_line(line, expected_fields, separator, eol_char, quote_char) { |
289 | valid = false; |
290 | break; |
291 | } |
292 | } |
293 | if valid { |
294 | return Some(total_pos + pos); |
295 | } else { |
296 | rejected_line_groups += 1; |
297 | } |
298 | } else { |
299 | debug_assert!(pos < input.len()); |
300 | unsafe { |
301 | input = input.get_unchecked(pos + 1..); |
302 | } |
303 | total_pos += pos + 1; |
304 | } |
305 | }, |
306 | // don't count the fields |
307 | (Some(_), None) => return Some(total_pos + pos), |
308 | // // no new line found, check latest line (without eol) for number of fields |
309 | _ => return None, |
310 | } |
311 | } |
312 | } |
313 | |
314 | #[inline(always)] |
315 | pub(super) fn is_whitespace(b: u8) -> bool { |
316 | b == b' ' || b == b'\t' |
317 | } |
318 | |
319 | /// May have false-positives, but not false negatives. |
320 | #[inline(always)] |
321 | pub(super) fn could_be_whitespace_fast(b: u8) -> bool { |
322 | // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are |
323 | // <= 32. In that range there aren't a lot of other common symbols (besides |
324 | // newline), so this is a quick test which can be worth doing to avoid the |
325 | // exact test. |
326 | b <= 32 |
327 | } |
328 | |
329 | #[inline] |
330 | fn skip_condition<F>(input: &[u8], f: F) -> &[u8] |
331 | where |
332 | F: Fn(u8) -> bool, |
333 | { |
334 | if input.is_empty() { |
335 | return input; |
336 | } |
337 | |
338 | let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len()); |
339 | &input[read..] |
340 | } |
341 | |
342 | /// Remove whitespace from the start of buffer. |
343 | /// Makes sure that the bytes stream starts with |
344 | /// 'field_1,field_2' |
345 | /// and not with |
346 | /// '\nfield_1,field_1' |
347 | #[inline] |
348 | pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] { |
349 | skip_condition(input, is_whitespace) |
350 | } |
351 | |
352 | /// An adapted version of std::iter::Split. |
353 | /// This exists solely because we cannot split the file in lines naively as |
354 | /// |
355 | /// ```text |
356 | /// for line in bytes.split(b'\n') { |
357 | /// ``` |
358 | /// |
359 | /// This will fail when strings fields are have embedded end line characters. |
360 | /// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines. |
361 | pub struct SplitLines<'a> { |
362 | v: &'a [u8], |
363 | quote_char: u8, |
364 | eol_char: u8, |
365 | #[cfg(feature = "simd")] |
366 | simd_eol_char: SimdVec, |
367 | #[cfg(feature = "simd")] |
368 | simd_quote_char: SimdVec, |
369 | #[cfg(feature = "simd")] |
370 | previous_valid_eols: u64, |
371 | total_index: usize, |
372 | quoting: bool, |
373 | comment_prefix: Option<&'a CommentPrefix>, |
374 | } |
375 | |
376 | #[cfg(feature = "simd")] |
377 | const SIMD_SIZE: usize = 64; |
378 | #[cfg(feature = "simd")] |
379 | use std::simd::prelude::*; |
380 | |
381 | #[cfg(feature = "simd")] |
382 | use polars_utils::clmul::prefix_xorsum_inclusive; |
383 | |
384 | #[cfg(feature = "simd")] |
385 | type SimdVec = u8x64; |
386 | |
387 | impl<'a> SplitLines<'a> { |
388 | pub fn new( |
389 | slice: &'a [u8], |
390 | quote_char: Option<u8>, |
391 | eol_char: u8, |
392 | comment_prefix: Option<&'a CommentPrefix>, |
393 | ) -> Self { |
394 | let quoting = quote_char.is_some(); |
395 | let quote_char = quote_char.unwrap_or(b'\"'); |
396 | #[cfg(feature = "simd")] |
397 | let simd_eol_char = SimdVec::splat(eol_char); |
398 | #[cfg(feature = "simd")] |
399 | let simd_quote_char = SimdVec::splat(quote_char); |
400 | Self { |
401 | v: slice, |
402 | quote_char, |
403 | eol_char, |
404 | #[cfg(feature = "simd")] |
405 | simd_eol_char, |
406 | #[cfg(feature = "simd")] |
407 | simd_quote_char, |
408 | #[cfg(feature = "simd")] |
409 | previous_valid_eols: 0, |
410 | total_index: 0, |
411 | quoting, |
412 | comment_prefix, |
413 | } |
414 | } |
415 | } |
416 | |
417 | impl<'a> SplitLines<'a> { |
418 | // scalar as in non-simd |
419 | fn next_scalar(&mut self) -> Option<&'a [u8]> { |
420 | if self.v.is_empty() { |
421 | return None; |
422 | } |
423 | if is_comment_line(self.v, self.comment_prefix) { |
424 | return self.next_comment_line(); |
425 | } |
426 | { |
427 | let mut pos = 0u32; |
428 | let mut iter = self.v.iter(); |
429 | let mut in_field = false; |
430 | loop { |
431 | match iter.next() { |
432 | Some(&c) => { |
433 | pos += 1; |
434 | |
435 | if self.quoting && c == self.quote_char { |
436 | // toggle between string field enclosure |
437 | // if we encounter a starting '"' -> in_field = true; |
438 | // if we encounter a closing '"' -> in_field = false; |
439 | in_field = !in_field; |
440 | } |
441 | // if we are not in a string and we encounter '\n' we can stop at this position. |
442 | else if c == self.eol_char && !in_field { |
443 | break; |
444 | } |
445 | }, |
446 | None => { |
447 | let remainder = self.v; |
448 | self.v = &[]; |
449 | return Some(remainder); |
450 | }, |
451 | } |
452 | } |
453 | |
454 | unsafe { |
455 | debug_assert!((pos as usize) <= self.v.len()); |
456 | |
457 | // return line up to this position |
458 | let ret = Some( |
459 | self.v |
460 | .get_unchecked(..(self.total_index + pos as usize - 1)), |
461 | ); |
462 | // skip the '\n' token and update slice. |
463 | self.v = self.v.get_unchecked(self.total_index + pos as usize..); |
464 | ret |
465 | } |
466 | } |
467 | } |
468 | fn next_comment_line(&mut self) -> Option<&'a [u8]> { |
469 | if let Some(pos) = next_line_position_naive(self.v, self.eol_char) { |
470 | unsafe { |
471 | // return line up to this position |
472 | let ret = Some(self.v.get_unchecked(..(pos - 1))); |
473 | // skip the '\n' token and update slice. |
474 | self.v = self.v.get_unchecked(pos..); |
475 | ret |
476 | } |
477 | } else { |
478 | let remainder = self.v; |
479 | self.v = &[]; |
480 | Some(remainder) |
481 | } |
482 | } |
483 | } |
484 | |
485 | impl<'a> Iterator for SplitLines<'a> { |
486 | type Item = &'a [u8]; |
487 | |
488 | #[inline] |
489 | #[cfg(not(feature = "simd"))] |
490 | fn next(&mut self) -> Option<&'a [u8]> { |
491 | self.next_scalar() |
492 | } |
493 | |
494 | #[inline] |
495 | #[cfg(feature = "simd")] |
496 | fn next(&mut self) -> Option<&'a [u8]> { |
497 | // First check cached value |
498 | if self.previous_valid_eols != 0 { |
499 | let pos = self.previous_valid_eols.trailing_zeros() as usize; |
500 | self.previous_valid_eols >>= (pos + 1) as u64; |
501 | |
502 | unsafe { |
503 | debug_assert!((pos) <= self.v.len()); |
504 | |
505 | // return line up to this position |
506 | let ret = Some(self.v.get_unchecked(..pos)); |
507 | // skip the '\n' token and update slice. |
508 | self.v = self.v.get_unchecked(pos + 1..); |
509 | return ret; |
510 | } |
511 | } |
512 | if self.v.is_empty() { |
513 | return None; |
514 | } |
515 | if self.comment_prefix.is_some() { |
516 | return self.next_scalar(); |
517 | } |
518 | |
519 | self.total_index = 0; |
520 | let mut not_in_field_previous_iter = true; |
521 | |
522 | loop { |
523 | let bytes = unsafe { self.v.get_unchecked(self.total_index..) }; |
524 | if bytes.len() > SIMD_SIZE { |
525 | let lane: [u8; SIMD_SIZE] = unsafe { |
526 | bytes |
527 | .get_unchecked(0..SIMD_SIZE) |
528 | .try_into() |
529 | .unwrap_unchecked() |
530 | }; |
531 | let simd_bytes = SimdVec::from(lane); |
532 | let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask(); |
533 | |
534 | let valid_eols = if self.quoting { |
535 | let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask(); |
536 | let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask); |
537 | |
538 | if not_in_field_previous_iter { |
539 | not_in_quote_field = !not_in_quote_field; |
540 | } |
541 | not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0; |
542 | eol_mask & not_in_quote_field |
543 | } else { |
544 | eol_mask |
545 | }; |
546 | |
547 | if valid_eols != 0 { |
548 | let pos = valid_eols.trailing_zeros() as usize; |
549 | if pos == SIMD_SIZE - 1 { |
550 | self.previous_valid_eols = 0; |
551 | } else { |
552 | self.previous_valid_eols = valid_eols >> (pos + 1) as u64; |
553 | } |
554 | |
555 | unsafe { |
556 | let pos = self.total_index + pos; |
557 | debug_assert!((pos) <= self.v.len()); |
558 | |
559 | // return line up to this position |
560 | let ret = Some(self.v.get_unchecked(..pos)); |
561 | // skip the '\n' token and update slice. |
562 | self.v = self.v.get_unchecked(pos + 1..); |
563 | return ret; |
564 | } |
565 | } else { |
566 | self.total_index += SIMD_SIZE; |
567 | } |
568 | } else { |
569 | // Denotes if we are in a string field, started with a quote |
570 | let mut in_field = !not_in_field_previous_iter; |
571 | let mut pos = 0u32; |
572 | let mut iter = bytes.iter(); |
573 | loop { |
574 | match iter.next() { |
575 | Some(&c) => { |
576 | pos += 1; |
577 | |
578 | if self.quoting && c == self.quote_char { |
579 | // toggle between string field enclosure |
580 | // if we encounter a starting '"' -> in_field = true; |
581 | // if we encounter a closing '"' -> in_field = false; |
582 | in_field = !in_field; |
583 | } |
584 | // if we are not in a string and we encounter '\n' we can stop at this position. |
585 | else if c == self.eol_char && !in_field { |
586 | break; |
587 | } |
588 | }, |
589 | None => { |
590 | let remainder = self.v; |
591 | self.v = &[]; |
592 | return Some(remainder); |
593 | }, |
594 | } |
595 | } |
596 | |
597 | unsafe { |
598 | debug_assert!((pos as usize) <= self.v.len()); |
599 | |
600 | // return line up to this position |
601 | let ret = Some( |
602 | self.v |
603 | .get_unchecked(..(self.total_index + pos as usize - 1)), |
604 | ); |
605 | // skip the '\n' token and update slice. |
606 | self.v = self.v.get_unchecked(self.total_index + pos as usize..); |
607 | return ret; |
608 | } |
609 | } |
610 | } |
611 | } |
612 | } |
613 | |
614 | pub struct CountLines { |
615 | quote_char: u8, |
616 | eol_char: u8, |
617 | #[cfg(feature = "simd")] |
618 | simd_eol_char: SimdVec, |
619 | #[cfg(feature = "simd")] |
620 | simd_quote_char: SimdVec, |
621 | quoting: bool, |
622 | comment_prefix: Option<CommentPrefix>, |
623 | } |
624 | |
625 | #[derive(Copy, Clone, Debug, Default)] |
626 | pub struct LineStats { |
627 | pub newline_count: usize, |
628 | pub last_newline_offset: usize, |
629 | pub end_inside_string: bool, |
630 | } |
631 | |
632 | impl CountLines { |
633 | pub fn new( |
634 | quote_char: Option<u8>, |
635 | eol_char: u8, |
636 | comment_prefix: Option<CommentPrefix>, |
637 | ) -> Self { |
638 | let quoting = quote_char.is_some(); |
639 | let quote_char = quote_char.unwrap_or(b'\"'); |
640 | #[cfg(feature = "simd")] |
641 | let simd_eol_char = SimdVec::splat(eol_char); |
642 | #[cfg(feature = "simd")] |
643 | let simd_quote_char = SimdVec::splat(quote_char); |
644 | Self { |
645 | quote_char, |
646 | eol_char, |
647 | #[cfg(feature = "simd")] |
648 | simd_eol_char, |
649 | #[cfg(feature = "simd")] |
650 | simd_quote_char, |
651 | quoting, |
652 | comment_prefix, |
653 | } |
654 | } |
655 | |
656 | /// Analyzes a chunk of CSV data. |
657 | /// |
658 | /// Returns (newline_count, last_newline_offset, end_inside_string) twice, |
659 | /// the first is assuming the start of the chunk is *not* inside a string, |
660 | /// the second assuming the start is inside a string. |
661 | /// |
662 | /// If comment_prefix is not None the start of bytes must be at the start of |
663 | /// a line (and thus not in the middle of a comment). |
664 | pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] { |
665 | let mut states = [ |
666 | LineStats { |
667 | newline_count: 0, |
668 | last_newline_offset: 0, |
669 | end_inside_string: false, |
670 | }, |
671 | LineStats { |
672 | newline_count: 0, |
673 | last_newline_offset: 0, |
674 | end_inside_string: false, |
675 | }, |
676 | ]; |
677 | |
678 | // If we have to deal with comments we can't use SIMD and have to explicitly do two passes. |
679 | if self.comment_prefix.is_some() { |
680 | states[0] = self.analyze_chunk_with_comment(bytes, false); |
681 | states[1] = self.analyze_chunk_with_comment(bytes, true); |
682 | return states; |
683 | } |
684 | |
685 | // False if even number of quotes seen so far, true otherwise. |
686 | #[allow(unused_assignments)] |
687 | let mut global_quote_parity = false; |
688 | let mut scan_offset = 0; |
689 | |
690 | #[cfg(feature = "simd")] |
691 | { |
692 | // 0 if even number of quotes seen so far, u64::MAX otherwise. |
693 | let mut global_quote_parity_mask = 0; |
694 | while scan_offset + 64 <= bytes.len() { |
695 | let block: [u8; 64] = unsafe { |
696 | bytes |
697 | .get_unchecked(scan_offset..scan_offset + 64) |
698 | .try_into() |
699 | .unwrap_unchecked() |
700 | }; |
701 | let simd_bytes = SimdVec::from(block); |
702 | let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask(); |
703 | if self.quoting { |
704 | let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask(); |
705 | let quote_parity = |
706 | prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask; |
707 | global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64; |
708 | |
709 | let start_outside_string_eol_mask = eol_mask & !quote_parity; |
710 | states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize; |
711 | states[0].last_newline_offset = select_unpredictable( |
712 | start_outside_string_eol_mask != 0, |
713 | (scan_offset + 63) |
714 | .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize), |
715 | states[0].last_newline_offset, |
716 | ); |
717 | |
718 | let start_inside_string_eol_mask = eol_mask & quote_parity; |
719 | states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize; |
720 | states[1].last_newline_offset = select_unpredictable( |
721 | start_inside_string_eol_mask != 0, |
722 | (scan_offset + 63) |
723 | .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize), |
724 | states[1].last_newline_offset, |
725 | ); |
726 | } else { |
727 | states[0].newline_count += eol_mask.count_ones() as usize; |
728 | states[0].last_newline_offset = select_unpredictable( |
729 | eol_mask != 0, |
730 | (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize), |
731 | states[0].last_newline_offset, |
732 | ); |
733 | } |
734 | |
735 | scan_offset += 64; |
736 | } |
737 | |
738 | global_quote_parity = global_quote_parity_mask > 0; |
739 | } |
740 | |
741 | while scan_offset < bytes.len() { |
742 | let c = unsafe { *bytes.get_unchecked(scan_offset) }; |
743 | global_quote_parity ^= (c == self.quote_char) & self.quoting; |
744 | |
745 | let state = &mut states[global_quote_parity as usize]; |
746 | state.newline_count += (c == self.eol_char) as usize; |
747 | state.last_newline_offset = |
748 | select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset); |
749 | |
750 | scan_offset += 1; |
751 | } |
752 | |
753 | states[0].end_inside_string = global_quote_parity; |
754 | states[1].end_inside_string = !global_quote_parity; |
755 | states |
756 | } |
757 | |
758 | // bytes must begin at the start of a line. |
759 | fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats { |
760 | let pre_s = match self.comment_prefix.as_ref().unwrap() { |
761 | CommentPrefix::Single(pc) => core::slice::from_ref(pc), |
762 | CommentPrefix::Multi(ps) => ps.as_bytes(), |
763 | }; |
764 | |
765 | let mut state = LineStats::default(); |
766 | let mut scan_offset = 0; |
767 | while scan_offset < bytes.len() { |
768 | // Skip comment line if needed. |
769 | while bytes[scan_offset..].starts_with(pre_s) { |
770 | scan_offset += pre_s.len(); |
771 | let Some(nl_off) = bytes[scan_offset..] |
772 | .iter() |
773 | .position(|c| *c == self.eol_char) |
774 | else { |
775 | break; |
776 | }; |
777 | scan_offset += nl_off + 1; |
778 | } |
779 | |
780 | while scan_offset < bytes.len() { |
781 | let c = unsafe { *bytes.get_unchecked(scan_offset) }; |
782 | in_string ^= (c == self.quote_char) & self.quoting; |
783 | |
784 | if c == self.eol_char && !in_string { |
785 | state.newline_count += 1; |
786 | state.last_newline_offset = scan_offset; |
787 | scan_offset += 1; |
788 | break; |
789 | } else { |
790 | scan_offset += 1; |
791 | } |
792 | } |
793 | } |
794 | |
795 | state.end_inside_string = in_string; |
796 | state |
797 | } |
798 | |
799 | pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) { |
800 | loop { |
801 | let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) }; |
802 | |
803 | let (count, offset) = if self.comment_prefix.is_some() { |
804 | let stats = self.analyze_chunk_with_comment(b, false); |
805 | (stats.newline_count, stats.last_newline_offset) |
806 | } else { |
807 | self.count(b) |
808 | }; |
809 | |
810 | if count > 0 || b.len() == bytes.len() { |
811 | return (count, offset); |
812 | } |
813 | |
814 | *chunk_size = chunk_size.saturating_mul(2); |
815 | } |
816 | } |
817 | |
818 | pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) { |
819 | let stats = if self.comment_prefix.is_some() { |
820 | self.analyze_chunk_with_comment(bytes, false) |
821 | } else { |
822 | self.analyze_chunk(bytes)[0] |
823 | }; |
824 | |
825 | let mut count = stats.newline_count; |
826 | let mut offset = stats.last_newline_offset; |
827 | |
828 | if count > 0 { |
829 | offset = cmp::min(offset + 1, bytes.len()); |
830 | } else { |
831 | debug_assert!(offset == 0); |
832 | } |
833 | |
834 | if is_eof { |
835 | count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref()) |
836 | as usize; |
837 | offset = bytes.len(); |
838 | } |
839 | |
840 | (count, offset) |
841 | } |
842 | |
843 | /// Returns count and offset to split for remainder in slice. |
844 | #[cfg(feature = "simd")] |
845 | pub fn count(&self, bytes: &[u8]) -> (usize, usize) { |
846 | let mut total_idx = 0; |
847 | let original_bytes = bytes; |
848 | let mut count = 0; |
849 | let mut position = 0; |
850 | let mut not_in_field_previous_iter = true; |
851 | |
852 | loop { |
853 | let bytes = unsafe { original_bytes.get_unchecked(total_idx..) }; |
854 | |
855 | if bytes.len() > SIMD_SIZE { |
856 | let lane: [u8; SIMD_SIZE] = unsafe { |
857 | bytes |
858 | .get_unchecked(0..SIMD_SIZE) |
859 | .try_into() |
860 | .unwrap_unchecked() |
861 | }; |
862 | let simd_bytes = SimdVec::from(lane); |
863 | let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask(); |
864 | |
865 | let valid_eols = if self.quoting { |
866 | let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask(); |
867 | let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask); |
868 | |
869 | if not_in_field_previous_iter { |
870 | not_in_quote_field = !not_in_quote_field; |
871 | } |
872 | not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0; |
873 | eol_mask & not_in_quote_field |
874 | } else { |
875 | eol_mask |
876 | }; |
877 | |
878 | if valid_eols != 0 { |
879 | count += valid_eols.count_ones() as usize; |
880 | position = total_idx + 63 - valid_eols.leading_zeros() as usize; |
881 | debug_assert_eq!(original_bytes[position], self.eol_char) |
882 | } |
883 | total_idx += SIMD_SIZE; |
884 | } else if bytes.is_empty() { |
885 | debug_assert!(count == 0 || original_bytes[position] == self.eol_char); |
886 | return (count, position); |
887 | } else { |
888 | let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter); |
889 | |
890 | let (count, position) = if c > 0 { |
891 | (count + c, total_idx + o) |
892 | } else { |
893 | (count, position) |
894 | }; |
895 | debug_assert!(count == 0 || original_bytes[position] == self.eol_char); |
896 | |
897 | return (count, position); |
898 | } |
899 | } |
900 | } |
901 | |
902 | #[cfg(not(feature = "simd"))] |
903 | pub fn count(&self, bytes: &[u8]) -> (usize, usize) { |
904 | self.count_no_simd(bytes, false) |
905 | } |
906 | |
907 | fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) { |
908 | let iter = bytes.iter(); |
909 | let mut in_field = in_field; |
910 | let mut count = 0; |
911 | let mut position = 0; |
912 | |
913 | for b in iter { |
914 | let c = *b; |
915 | if self.quoting && c == self.quote_char { |
916 | // toggle between string field enclosure |
917 | // if we encounter a starting '"' -> in_field = true; |
918 | // if we encounter a closing '"' -> in_field = false; |
919 | in_field = !in_field; |
920 | } |
921 | // If we are not in a string and we encounter '\n' we can stop at this position. |
922 | else if c == self.eol_char && !in_field { |
923 | position = (b as *const _ as usize) - (bytes.as_ptr() as usize); |
924 | count += 1; |
925 | } |
926 | } |
927 | debug_assert!(count == 0 || bytes[position] == self.eol_char); |
928 | |
929 | (count, position) |
930 | } |
931 | } |
932 | |
933 | fn ends_in_unterminated_row( |
934 | bytes: &[u8], |
935 | eol_char: u8, |
936 | comment_prefix: Option<&CommentPrefix>, |
937 | ) -> bool { |
938 | if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char { |
939 | // We can do a simple backwards-scan to find the start of last line if it is a |
940 | // comment line, since comment lines can't escape new-lines. |
941 | let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0); |
942 | let last_line_is_comment_line = bytes |
943 | .get(last_new_line_post + 1..) |
944 | .map(|line| is_comment_line(line, comment_prefix)) |
945 | .unwrap_or(false); |
946 | |
947 | return !last_line_is_comment_line; |
948 | } |
949 | |
950 | false |
951 | } |
952 | |
953 | #[inline] |
954 | fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> { |
955 | let mut in_field = false; |
956 | |
957 | let mut idx = 0u32; |
958 | // micro optimizations |
959 | #[allow(clippy::explicit_counter_loop)] |
960 | for &c in bytes.iter() { |
961 | if c == quote_char { |
962 | // toggle between string field enclosure |
963 | // if we encounter a starting '"' -> in_field = true; |
964 | // if we encounter a closing '"' -> in_field = false; |
965 | in_field = !in_field; |
966 | } |
967 | |
968 | if !in_field && c == needle { |
969 | return Some(idx as usize); |
970 | } |
971 | idx += 1; |
972 | } |
973 | None |
974 | } |
975 | |
976 | #[inline] |
977 | pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] { |
978 | let pos = match quote { |
979 | Some(quote) => find_quoted(bytes, quote, eol_char), |
980 | None => bytes.iter().position(|x| *x == eol_char), |
981 | }; |
982 | match pos { |
983 | None => &[], |
984 | Some(pos) => &bytes[pos + 1..], |
985 | } |
986 | } |
987 | |
988 | #[inline] |
989 | pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] { |
990 | if let Some(pos) = next_line_position_naive(input, eol_char) { |
991 | unsafe { input.get_unchecked(pos..) } |
992 | } else { |
993 | &[] |
994 | } |
995 | } |
996 | |
997 | /// Parse CSV. |
998 | /// |
999 | /// # Arguments |
1000 | /// * `bytes` - input to parse |
1001 | /// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every |
1002 | /// thread has a different offset. |
1003 | /// * `projection` - Indices of the columns to project. |
1004 | /// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the |
1005 | /// fields are written to the buffers. The UTF8 data will be parsed later. |
1006 | /// |
1007 | /// Returns the number of bytes parsed successfully. |
1008 | #[allow(clippy::too_many_arguments)] |
1009 | pub(super) fn parse_lines( |
1010 | mut bytes: &[u8], |
1011 | parse_options: &CsvParseOptions, |
1012 | offset: usize, |
1013 | ignore_errors: bool, |
1014 | null_values: Option<&NullValuesCompiled>, |
1015 | projection: &[usize], |
1016 | buffers: &mut [Builder], |
1017 | n_lines: usize, |
1018 | // length of original schema |
1019 | schema_len: usize, |
1020 | schema: &Schema, |
1021 | ) -> PolarsResult<usize> { |
1022 | assert!( |
1023 | !projection.is_empty(), |
1024 | "at least one column should be projected" |
1025 | ); |
1026 | let mut truncate_ragged_lines = parse_options.truncate_ragged_lines; |
1027 | // During projection pushdown we are not checking other csv fields. |
1028 | // This would be very expensive and we don't care as we only want |
1029 | // the projected columns. |
1030 | if projection.len() != schema_len { |
1031 | truncate_ragged_lines = true |
1032 | } |
1033 | |
1034 | // we use the pointers to track the no of bytes read. |
1035 | let start = bytes.as_ptr() as usize; |
1036 | let original_bytes_len = bytes.len(); |
1037 | let n_lines = n_lines as u32; |
1038 | |
1039 | let mut line_count = 0u32; |
1040 | loop { |
1041 | if line_count > n_lines { |
1042 | let end = bytes.as_ptr() as usize; |
1043 | return Ok(end - start); |
1044 | } |
1045 | |
1046 | if bytes.is_empty() { |
1047 | return Ok(original_bytes_len); |
1048 | } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) { |
1049 | // deal with comments |
1050 | let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char); |
1051 | bytes = bytes_rem; |
1052 | continue; |
1053 | } |
1054 | |
1055 | // Every line we only need to parse the columns that are projected. |
1056 | // Therefore we check if the idx of the field is in our projected columns. |
1057 | // If it is not, we skip the field. |
1058 | let mut projection_iter = projection.iter().copied(); |
1059 | let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() }; |
1060 | let mut processed_fields = 0; |
1061 | |
1062 | let mut iter = SplitFields::new( |
1063 | bytes, |
1064 | parse_options.separator, |
1065 | parse_options.quote_char, |
1066 | parse_options.eol_char, |
1067 | ); |
1068 | let mut idx = 0u32; |
1069 | let mut read_sol = 0; |
1070 | loop { |
1071 | match iter.next() { |
1072 | // end of line |
1073 | None => { |
1074 | bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) }; |
1075 | break; |
1076 | }, |
1077 | Some((mut field, needs_escaping)) => { |
1078 | let field_len = field.len(); |
1079 | |
1080 | // +1 is the split character that is consumed by the iterator. |
1081 | read_sol += field_len + 1; |
1082 | |
1083 | if idx == next_projected as u32 { |
1084 | // the iterator is finished when it encounters a `\n` |
1085 | // this could be preceded by a '\r' |
1086 | unsafe { |
1087 | if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' { |
1088 | field = field.get_unchecked(..field_len - 1); |
1089 | } |
1090 | } |
1091 | |
1092 | debug_assert!(processed_fields < buffers.len()); |
1093 | let buf = unsafe { |
1094 | // SAFETY: processed fields index can never exceed the projection indices. |
1095 | buffers.get_unchecked_mut(processed_fields) |
1096 | }; |
1097 | let mut add_null = false; |
1098 | |
1099 | // if we have null values argument, check if this field equal null value |
1100 | if let Some(null_values) = null_values { |
1101 | let field = if needs_escaping && !field.is_empty() { |
1102 | unsafe { field.get_unchecked(1..field.len() - 1) } |
1103 | } else { |
1104 | field |
1105 | }; |
1106 | |
1107 | // SAFETY: |
1108 | // process fields is in bounds |
1109 | add_null = unsafe { null_values.is_null(field, idx as usize) } |
1110 | } |
1111 | if add_null { |
1112 | buf.add_null(!parse_options.missing_is_null && field.is_empty()) |
1113 | } else { |
1114 | buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null) |
1115 | .map_err(|e| { |
1116 | let bytes_offset = offset + field.as_ptr() as usize - start; |
1117 | let unparsable = String::from_utf8_lossy(field); |
1118 | let column_name = schema.get_at_index(idx as usize).unwrap().0; |
1119 | polars_err!( |
1120 | ComputeError: |
1121 | "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\ |
1122 | The current offset in the file is {} bytes.\n\ |
1123 | \n\ |
1124 | You might want to try:\n\ |
1125 | - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\ |
1126 | - specifying correct dtype with the `schema_overrides` argument\n\ |
1127 | - setting `ignore_errors` to `True`,\n\ |
1128 | - adding `{}` to the `null_values` list.\n\n\ |
1129 | Original error: ```{}```", |
1130 | &unparsable, |
1131 | buf.dtype(), |
1132 | column_name, |
1133 | idx + 1, |
1134 | bytes_offset, |
1135 | &unparsable, |
1136 | e |
1137 | ) |
1138 | })?; |
1139 | } |
1140 | processed_fields += 1; |
1141 | |
1142 | // if we have all projected columns we are done with this line |
1143 | match projection_iter.next() { |
1144 | Some(p) => next_projected = p, |
1145 | None => { |
1146 | if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) { |
1147 | bytes = unsafe { bytes.get_unchecked(read_sol..) }; |
1148 | } else { |
1149 | if !truncate_ragged_lines && read_sol < bytes.len() { |
1150 | polars_bail!(ComputeError: r#"found more fields than defined in 'Schema' |
1151 | |
1152 | Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE) |
1153 | } |
1154 | let bytes_rem = skip_this_line( |
1155 | unsafe { bytes.get_unchecked(read_sol - 1..) }, |
1156 | parse_options.quote_char, |
1157 | parse_options.eol_char, |
1158 | ); |
1159 | bytes = bytes_rem; |
1160 | } |
1161 | break; |
1162 | }, |
1163 | } |
1164 | } |
1165 | idx += 1; |
1166 | }, |
1167 | } |
1168 | } |
1169 | |
1170 | // there can be lines that miss fields (also the comma values) |
1171 | // this means the splitter won't process them. |
1172 | // We traverse them to read them as null values. |
1173 | while processed_fields < projection.len() { |
1174 | debug_assert!(processed_fields < buffers.len()); |
1175 | let buf = unsafe { |
1176 | // SAFETY: processed fields index can never exceed the projection indices. |
1177 | buffers.get_unchecked_mut(processed_fields) |
1178 | }; |
1179 | buf.add_null(!parse_options.missing_is_null); |
1180 | processed_fields += 1; |
1181 | } |
1182 | line_count += 1; |
1183 | } |
1184 | } |
1185 | |
1186 | #[cfg(test)] |
1187 | mod test { |
1188 | use super::SplitLines; |
1189 | |
1190 | #[test] |
1191 | fn test_splitlines() { |
1192 | let input = "1,\"foo\n\"\n2,\"foo\n\"\n"; |
1193 | let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None); |
1194 | assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes())); |
1195 | assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes())); |
1196 | assert_eq!(lines.next(), None); |
1197 | |
1198 | let input2 = "1,'foo\n'\n2,'foo\n'\n"; |
1199 | let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None); |
1200 | assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes())); |
1201 | assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes())); |
1202 | assert_eq!(lines2.next(), None); |
1203 | } |
1204 | } |