Concurso: la forma más rápida de ordenar una gran variedad de datos distribuidos por Gauss

71

Siguiendo el interés en esta pregunta , pensé que sería interesante hacer las respuestas un poco más objetivas y cuantitativas al proponer un concurso.

La idea es simple: he generado un archivo binario que contiene 50 millones de dobles distribuidos en gauss (promedio: 0, stdev 1). El objetivo es hacer un programa que los clasifique en la memoria lo más rápido posible. Una implementación de referencia muy simple en python tarda 1m4s en completarse. ¿Qué tan bajo podemos ir?

Las reglas son las siguientes: responda con un programa que abra el archivo "gaussian.dat" y clasifique los números en la memoria (sin necesidad de generarlos), e instrucciones para compilar y ejecutar el programa. El programa debe poder funcionar en mi máquina Arch Linux (lo que significa que puede usar cualquier lenguaje de programación o biblioteca que sea fácilmente instalable en este sistema).

El programa debe ser razonablemente legible, de modo que pueda asegurarme de que sea seguro iniciarlo (¡no hay una solución solo para ensambladores, por favor!).

Ejecutaré las respuestas en mi máquina (cuatro núcleos, 4 Gigabytes de RAM). La solución más rápida obtendrá la respuesta aceptada y una recompensa de 100 puntos :)

El programa utilizado para generar los números:

#!/usr/bin/env python
import random
from array import array
from sys import argv
count=int(argv[1])
a=array('d',(random.gauss(0,1) for x in xrange(count)))
f=open("gaussian.dat","wb")
a.tofile(f)

La implementación de referencia simple:

#!/usr/bin/env python
from array import array
from sys import argv
count=int(argv[1])
a=array('d')
a.fromfile(open("gaussian.dat"),count)
print "sorting..."
b=sorted(a)

EDITAR: solo 4 GB de RAM, lo siento

EDITAR # 2: Tenga en cuenta que el objetivo del concurso es ver si podemos usar información previa sobre los datos . ¡no se supone que sea una meada coincidencia entre diferentes implementaciones de lenguaje de programación!

static_rtti
fuente
1
Tome cada valor y muévalo directamente a su posición "esperada", repita para el valor desplazado. No estoy seguro de cómo resolver un problema de pareja con eso. Cuando termine, clasifique las burbujas hasta completarlas (un par de pases deberían hacerlo).
1
Publicaré una solución de clasificación de cubetas mañana por la noche si esto no se ha cerrado para entonces :)
1
@static_rtti: como gran usuario de CG, este es exactamente el tipo de cosas que "nos gusta" piratear en CG.SE. Para cualquier mod de lectura, mueva esto a CG, no lo cierre.
Arrdem
1
Bienvenido a CodeGolf.SE! He borrado gran parte de los comentarios del SO original sobre dónde pertenece o no, y volví a etiquetar para estar más cerca de la corriente principal de CodeGolf.SE.
dmckee
2
El único problema difícil aquí es que buscamos criterios objetivos ganadores, y "más rápido" introduce dependencias de la plataforma ... ¿un algoritmo O (n ^ {1.2}) implementado en la máquina virtual cpython supera un O (n ^ {1.3} ) algoritmo con una constante similar implementada en c? Generalmente sugiero que se discutan las características de rendimiento de cada solución, ya que esto puede ayudar a las personas a juzgar lo que está sucediendo.
dmckee

Respuestas:

13

Aquí hay una solución en C ++ que primero divide los números en cubos con el mismo número esperado de elementos y luego clasifica cada cubo por separado. Precalcula una tabla de la función de distribución acumulativa basada en algunas fórmulas de Wikipedia y luego interpola los valores de esta tabla para obtener una aproximación rápida.

Varios pasos se ejecutan en múltiples hilos para hacer uso de los cuatro núcleos.

#include <cstdlib>
#include <math.h>
#include <stdio.h>
#include <algorithm>

#include <tbb/parallel_for.h>

using namespace std;

typedef unsigned long long ull;

double signum(double x) {
    return (x<0) ? -1 : (x>0) ? 1 : 0;
}

const double fourOverPI = 4 / M_PI;

double erf(double x) {
    double a = 0.147;
    double x2 = x*x;
    double ax2 = a*x2;
    double f1 = -x2 * (fourOverPI + ax2) / (1 + ax2);
    double s1 = sqrt(1 - exp(f1));
    return signum(x) * s1;
}

const double sqrt2 = sqrt(2);

double cdf(double x) {
    return 0.5 + erf(x / sqrt2) / 2;
}

const int cdfTableSize = 200;
const double cdfTableLimit = 5;
double* computeCdfTable(int size) {
    double* res = new double[size];
    for (int i = 0; i < size; ++i) {
        res[i] = cdf(cdfTableLimit * i / (size - 1));
    }
    return res;
}
const double* const cdfTable = computeCdfTable(cdfTableSize);

double cdfApprox(double x) {
    bool negative = (x < 0);
    if (negative) x = -x;
    if (x > cdfTableLimit) return negative ? cdf(-x) : cdf(x);
    double p = (cdfTableSize - 1) * x / cdfTableLimit;
    int below = (int) p;
    if (p == below) return negative ? -cdfTable[below] : cdfTable[below];
    int above = below + 1;
    double ret = cdfTable[below] +
            (cdfTable[above] - cdfTable[below])*(p - below);
    return negative ? 1 - ret : ret;
}

void print(const double* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%e; ", arr[i]);
    }
    puts("");
}

void print(const int* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%d; ", arr[i]);
    }
    puts("");
}

void fillBuckets(int N, int bucketCount,
        double* data, int* partitions,
        double* buckets, int* offsets) {
    for (int i = 0; i < N; ++i) {
        ++offsets[partitions[i]];
    }

    int offset = 0;
    for (int i = 0; i < bucketCount; ++i) {
        int t = offsets[i];
        offsets[i] = offset;
        offset += t;
    }
    offsets[bucketCount] = N;

    int next[bucketCount];
    memset(next, 0, sizeof(next));
    for (int i = 0; i < N; ++i) {
        int p = partitions[i];
        int j = offsets[p] + next[p];
        ++next[p];
        buckets[j] = data[i];
    }
}

class Sorter {
public:
    Sorter(double* data, int* offsets) {
        this->data = data;
        this->offsets = offsets;
    }

    static void radixSort(double* arr, int len) {
        ull* encoded = (ull*)arr;
        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= allBits;
            } else {
                n ^= signBit;
            }
            encoded[i] = n;
        }

        const int step = 11;
        const ull mask = (1ull << step) - 1;
        int offsets[8][1ull << step];
        memset(offsets, 0, sizeof(offsets));

        for (int i = 0; i < len; ++i) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int p = (encoded[i] >> b) & mask;
                ++offsets[j][p];
            }
        }

        int sum[8] = {0};
        for (int i = 0; i <= mask; i++) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int t = sum[j] + offsets[j][i];
                offsets[j][i] = sum[j];
                sum[j] = t;
            }
        }

        ull* copy = new ull[len];
        ull* current = encoded;
        for (int b = 0, j = 0; b < 64; b += step, ++j) {
            for (int i = 0; i < len; ++i) {
                int p = (current[i] >> b) & mask;
                copy[offsets[j][p]] = current[i];
                ++offsets[j][p];
            }

            ull* t = copy;
            copy = current;
            current = t;
        }

        if (current != encoded) {
            for (int i = 0; i < len; ++i) {
                encoded[i] = current[i];
            }
        }

        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= signBit;
            } else {
                n ^= allBits;
            }
            encoded[i] = n;
        }
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double* begin = &data[offsets[i]];
            double* end = &data[offsets[i+1]];
            //std::sort(begin, end);
            radixSort(begin, end-begin);
        }
    }

private:
    double* data;
    int* offsets;
    static const ull signBit = 1ull << 63;
    static const ull allBits = ~0ull;
};

void sortBuckets(int bucketCount, double* data, int* offsets) {
    Sorter sorter(data, offsets);
    tbb::blocked_range<int> range(0, bucketCount);
    tbb::parallel_for(range, sorter);
    //sorter(range);
}

class Partitioner {
public:
    Partitioner(int bucketCount, double* data, int* partitions) {
        this->data = data;
        this->partitions = partitions;
        this->bucketCount = bucketCount;
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double d = data[i];
            int p = (int) (cdfApprox(d) * bucketCount);
            partitions[i] = p;
        }
    }

private:
    double* data;
    int* partitions;
    int bucketCount;
};

const int bucketCount = 512;
int offsets[bucketCount + 1];

int main(int argc, char** argv) {
    if (argc != 2) {
        printf("Usage: %s N\n N = the size of the input\n", argv[0]);
        return 1;
    }

    puts("initializing...");
    int N = atoi(argv[1]);
    double* data = new double[N];
    double* buckets = new double[N];
    memset(offsets, 0, sizeof(offsets));
    int* partitions = new int[N];

    puts("loading data...");
    FILE* fp = fopen("gaussian.dat", "rb");
    if (fp == 0 || fread(data, sizeof(*data), N, fp) != N) {
        puts("Error reading data");
        return 1;
    }
    //print(data, N);

    puts("assigning partitions...");
    tbb::parallel_for(tbb::blocked_range<int>(0, N),
            Partitioner(bucketCount, data, partitions));

    puts("filling buckets...");
    fillBuckets(N, bucketCount, data, partitions, buckets, offsets);
    data = buckets;

    puts("sorting buckets...");
    sortBuckets(bucketCount, data, offsets);

    puts("done.");

    /*
    for (int i = 0; i < N-1; ++i) {
        if (data[i] > data[i+1]) {
            printf("error at %d: %e > %e\n", i, data[i], data[i+1]);
        }
    }
    */

    //print(data, N);

    return 0;
}

Para compilarlo y ejecutarlo, use este comando:

g++ -O3 -ltbb -o gsort gsort.cpp && time ./gsort 50000000

EDITAR: Todos los cubos ahora se colocan en la misma matriz para eliminar la necesidad de copiar los cubos nuevamente en la matriz. También se redujo el tamaño de la tabla con valores calculados previamente, porque los valores son lo suficientemente precisos. Aún así, si cambio el número de cubos por encima de 256, el programa tarda más en ejecutarse que con ese número de cubos.

EDITAR: El mismo algoritmo, diferente lenguaje de programación. Usé C ++ en lugar de Java y el tiempo de ejecución se redujo de ~ 3.2s a ~ 2.35s en mi máquina. El número óptimo de cubos sigue siendo de alrededor de 256 (de nuevo, en mi computadora).

Por cierto, tbb realmente es increíble.

EDITAR: Me inspiró la gran solución de Alexandru y reemplacé std :: sort en la última fase por una versión modificada de su clasificación radix. Utilicé un método diferente para lidiar con los números positivos / negativos, a pesar de que necesita más pases a través de la matriz. También decidí ordenar la matriz exactamente y eliminar la clasificación de inserción. Más tarde pasaré un tiempo probando cómo estos cambios influyen en el rendimiento y posiblemente los reviertan. Sin embargo, al usar la clasificación de radix, el tiempo disminuyó de ~ 2.35s a ~ 1.63s.

k21
fuente
Agradable. Tengo 3.055 en la mía. Lo más bajo que pude conseguir el mío fue 6.3. Estoy seleccionando el tuyo para mejorar las estadísticas. ¿Por qué elegiste 256 como el número de cubos? Intenté 128 y 512, pero 256 funcionó mejor.
Scott
¿Por qué elegí 256 como el número de cubos? Intenté 128 y 512, pero 256 funcionó mejor. :) Lo encontré empíricamente y no estoy seguro de por qué aumentar el número de depósitos ralentiza el algoritmo: la asignación de memoria no debería llevar tanto tiempo. Tal vez algo relacionado con el tamaño de caché?
k21
2.725s en mi máquina. Bastante agradable para una solución de Java, teniendo en cuenta el tiempo de carga de la JVM.
static_rtti
2
Cambié su código para usar los paquetes de nio, según mi solución y la de Arjan (usé su sintaxis, ya que era más limpia que la mía) y pude obtener .3 segundos más rápido. Tengo un SSD, me pregunto cuáles serían las implicaciones si no. También elimina algunos de tus pequeños giros. Las secciones modificadas están aquí.
Scott
3
Esta es la solución paralela más rápida en mis pruebas (16core cpu). 1.22s lejos del segundo lugar de 1.94s.
Alexandru
13

Sin ser inteligente, solo para proporcionar un clasificador ingenuo mucho más rápido, aquí hay uno en C que debería ser más o menos equivalente a su Python:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int cmp(const void* av, const void* bv) {
    double a = *(const double*)av;
    double b = *(const double*)bv;
    return a < b ? -1 : a > b ? 1 : 0;
}
int main(int argc, char** argv) {
    if (argc <= 1)
        return puts("No argument!");
    unsigned count = atoi(argv[1]);

    double *a = malloc(count * sizeof *a);

    FILE *f = fopen("gaussian.dat", "rb");
    if (fread(a, sizeof *a, count, f) != count)
        return puts("fread failed!");
    fclose(f);

    puts("sorting...");
    double *b = malloc(count * sizeof *b);
    memcpy(b, a, count * sizeof *b);
    qsort(b, count, sizeof *b, cmp);
    return 0;
}

Compilado con gcc -O3, en mi máquina, esto lleva más de un minuto menos que el Python: aproximadamente 11 s en comparación con 87 s.


fuente
1
Tomó 10.086s en mi máquina, lo que te convierte en el líder actual. Pero estoy bastante seguro de que podemos hacerlo mejor :)
1
¿Podría intentar eliminar el segundo operador ternario y simplemente devolver 1 para ese caso porque los dobles aleatorios no son iguales entre sí en esta cantidad de datos?
Codism
@Codism: Agregaría que no nos importa intercambiar ubicaciones de datos equivalentes, por lo tanto, incluso si pudiéramos obtener valores equivalentes, sería una simplificación adecuada.
10

Particioné en segmentos basados ​​en la desviación estándar que mejor debería dividirlo en 4tos. Editar: reescrito en la partición en función del valor x en http://en.wikipedia.org/wiki/Error_function#Table_of_values

http://www.wolframalpha.com/input/?i=percentages+by++normal+distribution

Intenté usar cubos más pequeños, pero parecía tener poco efecto una vez 2 * más allá del número de núcleos disponibles. Sin ninguna colección paralela, tomaría 37 segundos en mi caja y 24 con las colecciones paralelas. Si particiona a través de la distribución, no puede simplemente usar una matriz, por lo que hay algo más de sobrecarga. No tengo claro cuándo un valor estaría encuadrado / sin encuadrar en scala.

Estoy usando scala 2.9, para la colección paralela. Simplemente puede descargar la distribución tar.gz de la misma.

Para compilar: scalac SortFile.scala (acabo de copiarlo directamente en la carpeta scala / bin.

Para ejecutar: JAVA_OPTS = "- Xmx4096M" ./scala SortFile (lo ejecuté con 2 gigas de ram y obtuve casi al mismo tiempo)

Editar: Eliminado allocateDirect, más lento que solo asignar. Se eliminó el cebado del tamaño inicial para los buffers de matriz. Realmente lo hizo leer los valores completos de 50000000. Reescrito para evitar problemas con el autoboxing (aún más lento que ingenuo c)

import java.io.FileInputStream;
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.mutable.ArrayBuilder


object SortFile {

//used partition numbers from Damascus' solution
val partList = List(0, 0.15731, 0.31864, 0.48878, 0.67449, 0.88715, 1.1503, 1.5341)

val listSize = partList.size * 2;
val posZero = partList.size;
val neg = partList.map( _ * -1).reverse.zipWithIndex
val pos = partList.map( _ * 1).zipWithIndex.reverse

def partition(dbl:Double): Int = { 

//for each partition, i am running through the vals in order
//could make this a binary search to be more performant... but our list size is 4 (per side)

  if(dbl < 0) { return neg.find( dbl < _._1).get._2  }
  if(dbl > 0) { return posZero  + pos.find( dbl > _._1).get._2  }
      return posZero; 

}

  def main(args: Array[String])
    { 

    var l = 0
    val dbls = new Array[Double](50000000)
    val partList = new Array[Int](50000000)
    val pa = Array.fill(listSize){Array.newBuilder[Double]}
    val channel = new FileInputStream("../../gaussian.dat").getChannel()
    val bb = ByteBuffer.allocate(50000000 * 8)
    bb.order(ByteOrder.LITTLE_ENDIAN)
    channel.read(bb)
    bb.rewind
    println("Loaded" + System.currentTimeMillis())
    var dbl = 0.0
    while(bb.hasRemaining)
    { 
      dbl = bb.getDouble
      dbls.update(l,dbl) 

      l+=1
    }
    println("Beyond first load" + System.currentTimeMillis());

    for( i <- (0 to 49999999).par) { partList.update(i, partition(dbls(i)))}

    println("Partition computed" + System.currentTimeMillis() )
    for(i <- (0 to 49999999)) { pa(partList(i)) += dbls(i) }
    println("Partition completed " + System.currentTimeMillis())
    val toSort = for( i <- pa) yield i.result()
    println("Arrays Built" + System.currentTimeMillis());
    toSort.par.foreach{i:Array[Double] =>scala.util.Sorting.quickSort(i)};

    println("Read\t" + System.currentTimeMillis());

  }
}
Scott
fuente
1
8.185s! Supongo que es bueno para una solución de escala ... Además, ¡bravo por proporcionar la primera solución que realmente utiliza la distribución gaussiana de alguna manera!
1
Solo tenía el objetivo de competir con la solución de c #. No pensé que superaría a c / c ++. Además ... se comporta de manera muy diferente para ti que para mí. Estoy usando openJDK en mi extremo y es mucho más lento. Me pregunto si agregar más particiones ayudaría a su entorno.
Scott
9

Simplemente ponga esto en un archivo cs y compílelo con csc en teoría: (Requiere mono)

using System;
using System.IO;
using System.Threading;

namespace Sort
{
    class Program
    {
        const int count = 50000000;
        static double[][] doubles;
        static WaitHandle[] waiting = new WaitHandle[4];
        static AutoResetEvent[] events = new AutoResetEvent[4];

        static double[] Merge(double[] left, double[] right)
        {
            double[] result = new double[left.Length + right.Length];
            int l = 0, r = 0, spot = 0;
            while (l < left.Length && r < right.Length)
            {
                if (right[r] < left[l])
                    result[spot++] = right[r++];
                else
                    result[spot++] = left[l++];
            }
            while (l < left.Length) result[spot++] = left[l++];
            while (r < right.Length) result[spot++] = right[r++];
            return result;
        }

        static void ThreadStart(object data)
        {
            int index = (int)data;
            Array.Sort(doubles[index]);
            events[index].Set();
        }

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
            watch.Start();
            byte[] bytes = File.ReadAllBytes(@"..\..\..\SortGuassian\Data.dat");
            doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
            for (int i = 0; i < 4; i++)
            {
                for (int j = 0; j < count / 4; j++)
                {
                    doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
                }
            }
            Thread[] threads = new Thread[4];
            for (int i = 0; i < 4; i++)
            {
                threads[i] = new Thread(ThreadStart);
                waiting[i] = events[i] = new AutoResetEvent(false);
                threads[i].Start(i);
            }
            WaitHandle.WaitAll(waiting);
            double[] left = Merge(doubles[0], doubles[1]);
            double[] right = Merge(doubles[2], doubles[3]);
            double[] result = Merge(left, right);
            watch.Stop();
            Console.WriteLine(watch.Elapsed.ToString());
            Console.ReadKey();
        }
    }
}
Guvante
fuente
¿Puedo ejecutar sus soluciones con Mono? ¿Cómo debería hacerlo?
No he usado Mono, no pensé en eso, deberías poder compilar el F # y luego ejecutarlo.
1
Actualizado para usar cuatro hilos para mejorar el rendimiento. Ahora me da 6 segundos. Tenga en cuenta que esto podría mejorarse significativamente (5 segundos probablemente) si usa solo una matriz de repuesto y evita inicializar una tonelada de memoria a cero, lo que hace el CLR, ya que todo se escribe al menos una vez.
1
9.598s en mi máquina! Eres el líder actual :)
1
¡Mi madre me dijo que me mantuviera alejado de los chicos con Mono!
8

Como sabe cuál es la distribución, puede usar una clasificación O (N) de indexación directa. (Si te estás preguntando qué es eso, supongamos que tienes un mazo de 52 cartas y quieres ordenarlo. Solo tienes 52 contenedores y tira cada tarjeta en su propio contenedor).

Tienes 5e7 dobles. Asigne una matriz de resultados R de 5e7 dobles. Toma cada número xy obtén i = phi(x) * 5e7. Básicamente hacer R[i] = x. Tenga una manera de manejar las colisiones, como mover el número con el que puede estar colisionando (como en una simple codificación hash). Alternativamente, puede hacer que R sea un poco más grande, lleno de un valor vacío único . Al final, solo barres los elementos de R.

phies solo la función de distribución acumulativa gaussiana. Convierte un número distribuido gaussiano entre +/- infinito en un número distribuido uniforme entre 0 y 1. Una forma sencilla de calcularlo es mediante la búsqueda de tablas y la interpolación.


fuente
3
Tenga cuidado: conoce la distribución aproximada, no la distribución exacta. Usted sabe que los datos se generaron usando una ley gaussiana, pero como es finita, no sigue exactamente a una gaussiana.
@static_rtti: en este caso, la aproximación necesaria de phi crearía una molestia mayor que cualquier irregularidad en el conjunto de datos IMO.
1
@static_rtti: no tiene que ser exacto. Solo tiene que distribuir los datos para que sea aproximadamente uniforme, por lo que no se acumula demasiado en algunos lugares.
Supongamos que tienes 5e7 dobles. ¿Por qué no hacer que cada entrada en R sea un vector de, digamos, 5e6 vectores de doble? Luego, empuje hacia atrás cada doble en su vector apropiado. Ordena los vectores y listo. Esto debería tomar tiempo lineal en el tamaño de la entrada.
Neil G
En realidad, veo que a mdkess ya se le ocurrió esa solución.
Neil G
8

Aquí hay otra solución secuencial:

#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <ctime>

typedef unsigned long long ull;

int size;
double *dbuf, *copy;
int cnt[8][1 << 16];

void sort()
{
  const int step = 10;
  const int start = 24;
  ull mask = (1ULL << step) - 1;

  ull *ibuf = (ull *) dbuf;
  for (int i = 0; i < size; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int p = (~ibuf[i] >> w) & mask;
      cnt[v][p]++;
    }
  }

  int sum[8] = { 0 };
  for (int i = 0; i <= mask; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int tmp = sum[v] + cnt[v][i];
      cnt[v][i] = sum[v];
      sum[v] = tmp;
    }
  }

  for (int w = start, v = 0; w < 64; w += step, v++) {
    ull *ibuf = (ull *) dbuf;
    for (int i = 0; i < size; i++) {
      int p = (~ibuf[i] >> w) & mask;
      copy[cnt[v][p]++] = dbuf[i];
    }

    double *tmp = copy;
    copy = dbuf;
    dbuf = tmp;
  }

  for (int p = 0; p < size; p++)
    if (dbuf[p] >= 0.) {
      std::reverse(dbuf + p, dbuf + size);
      break;
    }

  // Insertion sort
  for (int i = 1; i < size; i++) {
    double value = dbuf[i];
    if (value < dbuf[i - 1]) {
      dbuf[i] = dbuf[i - 1];
      int p = i - 1;
      for (; p > 0 && value < dbuf[p - 1]; p--)
        dbuf[p] = dbuf[p - 1];
      dbuf[p] = value;
    }
  }
}

int main(int argc, char **argv) {
  size = atoi(argv[1]);
  dbuf = new double[size];
  copy = new double[size];

  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();
  sort();
  printf("Finished after %.3f\n", (double) ((clock() - c0)) / CLOCKS_PER_SEC);
  return 0;
}

Dudo que supere la solución de subprocesos múltiples, pero los tiempos en mi computadora portátil i7 son (stdsort es la solución C ++ proporcionada en otra respuesta):

$ g++ -O3 mysort.cpp -o mysort && ./mysort 50000000
Finished after 2.10
$ g++ -O3 stdsort.cpp -o stdsort && ./stdsort
Finished after 7.12

Tenga en cuenta que esta solución tiene una complejidad de tiempo lineal (porque utiliza la representación especial de dobles).

EDITAR : se corrigió el orden de los elementos para aumentar.

EDITAR : Velocidad mejorada en casi medio segundo.

EDITAR : Velocidad mejorada por otros 0.7 segundos. Hizo que el algoritmo sea más amigable con el caché.

EDITAR : Velocidad mejorada por otro 1 segundo. Como solo hay 50.000.000 de elementos, puedo clasificar parcialmente la mantisa y usar la ordenación por inserción (que es compatible con la caché) para arreglar elementos fuera de lugar. Esta idea elimina alrededor de dos iteraciones del último ciclo de clasificación de radix.

EDITAR : 0.16 menos segundos. Primero std :: reverse puede eliminarse si se invierte el orden de clasificación.

Alexandru
fuente
¡Ahora eso se está poniendo interesante! ¿Qué tipo de algoritmo de clasificación es?
static_rtti
2
Clasificación de radix con menos dígitos significativos . Puedes ordenar la mantisa, luego el exponente, luego el signo. El algoritmo presentado aquí lleva esta idea un paso más allá. Se puede paralelizar usando una idea de partición proporcionada en una respuesta diferente.
Alexandru
Bastante rápido para una solución de un solo hilo: 2.552s! ¿Cree que podría cambiar su solución para aprovechar el hecho de que los datos se distribuyen normalmente? Probablemente podría hacerlo mejor que las mejores soluciones multiproceso actuales.
static_rtti
1
@static_rtti: veo que Damascus Steel ya publicó una versión multiproceso de esta implementación. Mejoré el comportamiento de almacenamiento en caché de este algoritmo, por lo que ahora debería obtener mejores tiempos. Por favor, pruebe esta nueva versión.
Alexandru
2
1.459 en mis últimas pruebas. Si bien esta solución no es la ganadora según mis reglas, realmente merece grandes felicitaciones. ¡Felicidades!
static_rtti
6

Tomando la solución de Christian Ammer y paralelizándola con los bloques de construcción roscados de Intel

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>
#include <tbb/parallel_sort.h>

int main(void)
{
    std::ifstream ifs("gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
    values.push_back(d);
    clock_t c0 = clock();
    tbb::parallel_sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

Si tiene acceso a la biblioteca de Primitivas de rendimiento (IPP) de Intel, puede usar su clasificación de radix. Solo reemplaza

#include <tbb/parallel_sort.h>

con

#include "ipps.h"

y

tbb::parallel_sort(values.begin(), values.end());

con

std::vector<double> copy(values.size());
ippsSortRadixAscend_64f_I(&values[0], &copy[0], values.size());

En mi computadora portátil de doble núcleo, los tiempos son

C               16.4 s
C#              20 s
C++ std::sort   7.2 s
C++ tbb         5 s
C++ ipp         4.5 s
python          too long
Acero de Damasco
fuente
1
2.958s! ¡TBB parece genial y fácil de usar!
2
TBB es absurdamente asombroso. Es exactamente el nivel correcto de abstracción para el trabajo algorítmico.
drxzcl
5

¿Qué tal una implementación de clasificación rápida paralela que elige sus valores de pivote basados ​​en las estadísticas de la distribución, asegurando así particiones de igual tamaño? El primer pivote estaría en la media (cero en este caso), el siguiente par estaría en los percentiles 25 y 75 (+/- -0.67449 desviaciones estándar), y así sucesivamente, con cada partición dividiendo a la mitad el conjunto de datos restante más o menos perfectamente

codegardener
fuente
Eso es efectivamente lo que hice en la mía ... por supuesto que obtuviste esta publicación antes de que pudiera terminar mi redacción.
5

Muy feo (por qué usar matrices cuando puedo usar variables que terminan con números), pero código rápido (mi primer intento de std :: hilos), todo el tiempo (tiempo real) en mi sistema 1,8 s (en comparación con std :: sort () 4,8 s), compile con g ++ -std = c ++ 0x -O3 -march = native -pthread Simplemente pase los datos a través de stdin (funciona solo para 50M).

#include <cstdio>
#include <iostream>
#include <algorithm>
#include <thread>
using namespace std;
const size_t size=50000000;

void pivot(double* start,double * end, double middle,size_t& koniec){
    double * beg=start;
    end--;
    while (start!=end){
        if (*start>middle) swap (*start,*end--);
        else start++;
    }
    if (*end<middle) start+=1;
    koniec= start-beg;
}
void s(double * a, double* b){
    sort(a,b);
}
int main(){
    double *data=new double[size];
    FILE *f = fopen("gaussian.dat", "rb");
    fread(data,8,size,f);
    size_t end1,end2,end3,temp;
    pivot(data, data+size,0,end2);
    pivot(data, data+end2,-0.6745,end1);
    pivot(data+end2,data+size,0.6745,end3);
    end3+=end2;
    thread ts1(s,data,data+end1);
    thread ts2(s,data+end1,data+end2);
    thread ts3(s,data+end2,data+end3);
    thread ts4(s,data+end3,data+size);
    ts1.join(),ts2.join(),ts3.join(),ts4.join();
    //for (int i=0; i<size-1; i++){
    //  if (data[i]>data[i+1]) cerr<<"BLAD\n";
    //}
    fclose(f);
    //fwrite(data,8,size,stdout);
}

// Editar cambiado para leer el archivo gaussian.dat.

Zjarek
fuente
¿Podría cambiarlo para leer gaussian.dat, como lo hacen las soluciones C ++ anteriores?
Lo intentaré más tarde cuando llegue a casa.
static_rtti
Muy buena solución, ¡eres el líder actual (1.949s)! Y buen uso de la distribución gaussiana :)
static_rtti
4

Una solución C ++ usando std::sort(eventualmente más rápido que qsort, con respecto al rendimiento de qsort vs std :: sort )

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        values.push_back(d);
    clock_t c0 = clock();
    std::sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

No puedo decir gaussian.datcon certeza cuánto tiempo toma porque solo tengo 1 GB en mi máquina y con el código Python dado solo pude hacer un archivo con solo 25 millones de duplicados (sin obtener un error de memoria). Pero estoy muy interesado en cuánto tiempo se ejecuta el algoritmo std :: sort.

Christian Ammer
fuente
6.425s! Como era de esperar, C ++ brilla :)
@static_rtti: he probado el algoritmo Timsort de swensons (como lo sugirió Matthieu M. en su primera pregunta ). Tuve que hacer algunos cambios en el sort.harchivo para compilarlo con C ++. Era aproximadamente el doble de lento que std::sort. ¿No sé por qué, tal vez debido a las optimizaciones del compilador?
Christian Ammer
4

Aquí hay una mezcla del tipo de radix de Alexandru con el pivote inteligente roscado de Zjarek. Compilarlo con

g++ -std=c++0x -pthread -O3 -march=native sorter_gaussian_radix.cxx -o sorter_gaussian_radix

Puede cambiar el tamaño de la raíz definiendo STEP (por ejemplo, agregue -DSTEP = 11). Encontré que lo mejor para mi computadora portátil es 8 (el valor predeterminado).

Por defecto, divide el problema en 4 partes y lo ejecuta en múltiples hilos. Puede cambiar eso pasando un parámetro de profundidad a la línea de comando. Entonces, si tienes dos núcleos, ejecútalo como

sorter_gaussian_radix 50000000 1

y si tienes 16 núcleos

sorter_gaussian_radix 50000000 4

La profundidad máxima en este momento es 6 (64 hilos). Si coloca demasiados niveles, simplemente ralentizará el código.

Una cosa que también probé fue la clasificación por radix de la biblioteca Intel Performance Primitives (IPP). La implementación de Alexandru aplasta a IPP, con IPP siendo aproximadamente un 30% más lento. Esa variación también se incluye aquí (comentado).

#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>
#include <iostream>
#include <thread>
#include <vector>
#include <boost/cstdint.hpp>
// #include "ipps.h"

#ifndef STEP
#define STEP 8
#endif

const int step = STEP;
const int start_step=24;
const int num_steps=(64-start_step+step-1)/step;
int size;
double *dbuf, *copy;

clock_t c1, c2, c3, c4, c5;

const double distrib[]={-2.15387,
                        -1.86273,
                        -1.67594,
                        -1.53412,
                        -1.4178,
                        -1.31801,
                        -1.22986,
                        -1.15035,
                        -1.07752,
                        -1.00999,
                        -0.946782,
                        -0.887147,
                        -0.830511,
                        -0.776422,
                        -0.724514,
                        -0.67449,
                        -0.626099,
                        -0.579132,
                        -0.53341,
                        -0.488776,
                        -0.445096,
                        -0.40225,
                        -0.36013,
                        -0.318639,
                        -0.27769,
                        -0.237202,
                        -0.197099,
                        -0.157311,
                        -0.11777,
                        -0.0784124,
                        -0.0391761,
                        0,
                        0.0391761,
                        0.0784124,
                        0.11777,
                        0.157311,
                        0.197099,
                        0.237202,
                        0.27769,
                        0.318639,
                        0.36013,
                        0.40225,
                        0.445097,
                        0.488776,
                        0.53341,
                        0.579132,
                        0.626099,
                        0.67449,
                        0.724514,
                        0.776422,
                        0.830511,
                        0.887147,
                        0.946782,
                        1.00999,
                        1.07752,
                        1.15035,
                        1.22986,
                        1.31801,
                        1.4178,
                        1.53412,
                        1.67594,
                        1.86273,
                        2.15387};


class Distrib
{
  const int value;
public:
  Distrib(const double &v): value(v) {}

  bool operator()(double a)
  {
    return a<value;
  }
};


void recursive_sort(const int start, const int end,
                    const int index, const int offset,
                    const int depth, const int max_depth)
{
  if(depth<max_depth)
    {
      Distrib dist(distrib[index]);
      const int middle=std::partition(dbuf+start,dbuf+end,dist) - dbuf;

      // const int middle=
      //   std::partition(dbuf+start,dbuf+end,[&](double a)
      //                  {return a<distrib[index];})
      //   - dbuf;

      std::thread lower(recursive_sort,start,middle,index-offset,offset/2,
                        depth+1,max_depth);
      std::thread upper(recursive_sort,middle,end,index+offset,offset/2,
                        depth+1,max_depth);
      lower.join(), upper.join();
    }
  else
    {
  // ippsSortRadixAscend_64f_I(dbuf+start,copy+start,end-start);

      c1=clock();

      double *dbuf_local(dbuf), *copy_local(copy);
      boost::uint64_t mask = (1 << step) - 1;
      int cnt[num_steps][mask+1];

      boost::uint64_t *ibuf = reinterpret_cast<boost::uint64_t *> (dbuf_local);

      for(int i=0;i<num_steps;++i)
        for(uint j=0;j<mask+1;++j)
          cnt[i][j]=0;

      for (int i = start; i < end; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int p = (~ibuf[i] >> w) & mask;
              (cnt[v][p])++;
            }
        }

      c2=clock();

      std::vector<int> sum(num_steps,0);
      for (uint i = 0; i <= mask; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int tmp = sum[v] + cnt[v][i];
              cnt[v][i] = sum[v];
              sum[v] = tmp;
            }
        }

      c3=clock();

      for (int w = start_step, v = 0; w < 64; w += step, v++)
        {
          ibuf = reinterpret_cast<boost::uint64_t *>(dbuf_local);

          for (int i = start; i < end; i++)
            {
              int p = (~ibuf[i] >> w) & mask;
              copy_local[start+((cnt[v][p])++)] = dbuf_local[i];
            }
          std::swap(copy_local,dbuf_local);
        }

      // Do the last set of reversals
      for (int p = start; p < end; p++)
        if (dbuf_local[p] >= 0.)
          {
            std::reverse(dbuf_local+p, dbuf_local + end);
            break;
          }

      c4=clock();

      // Insertion sort
      for (int i = start+1; i < end; i++) {
        double value = dbuf_local[i];
        if (value < dbuf_local[i - 1]) {
          dbuf_local[i] = dbuf_local[i - 1];
          int p = i - 1;
          for (; p > 0 && value < dbuf_local[p - 1]; p--)
            dbuf_local[p] = dbuf_local[p - 1];
          dbuf_local[p] = value;
        }
      }
      c5=clock();

    }
}


int main(int argc, char **argv) {
  size = atoi(argv[1]);
  copy = new double[size];

  dbuf = new double[size];
  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();

  const int max_depth= (argc > 2) ? atoi(argv[2]) : 2;

  // ippsSortRadixAscend_64f_I(dbuf,copy,size);

  recursive_sort(0,size,31,16,0,max_depth);

  if(num_steps%2==1)
    std::swap(dbuf,copy);

  // for (int i=0; i<size-1; i++){
  //   if (dbuf[i]>dbuf[i+1])
  //     std::cout << "BAD "
  //               << i << " "
  //               << dbuf[i] << " "
  //               << dbuf[i+1] << " "
  //               << "\n";
  // }

  std::cout << "Finished after "
            << (double) (c1 - c0) / CLOCKS_PER_SEC << " "
            << (double) (c2 - c1) / CLOCKS_PER_SEC << " "
            << (double) (c3 - c2) / CLOCKS_PER_SEC << " "
            << (double) (c4 - c3) / CLOCKS_PER_SEC << " "
            << (double) (c5 - c4) / CLOCKS_PER_SEC << " "
            << "\n";

  // delete [] dbuf;
  // delete [] copy;
  return 0;
}

EDITAR : implementé las mejoras de caché de Alexandru, y eso redujo aproximadamente el 30% del tiempo en mi máquina.

EDITAR : esto implementa un tipo recursivo, por lo que debería funcionar bien en la máquina de 16 núcleos de Alexandru. También usa la última mejora de Alexandru y elimina una de las reversas. Para mí, esto dio una mejora del 20%.

EDITAR : Se corrigió un error de señal que causaba ineficiencia cuando hay más de 2 núcleos.

EDITAR : se eliminó la lambda, por lo que se compilará con versiones anteriores de gcc. Incluye la variación del código IPP comentada. También arreglé la documentación para ejecutar en 16 núcleos. Por lo que puedo decir, esta es la implementación más rápida.

EDITAR : se corrigió un error cuando STEP no es 8. Se aumentó el número máximo de subprocesos a 64. Se agregó información de sincronización.

Acero de Damasco
fuente
Agradable. Radix sort es muy poco amigable con el caché. Vea si puede obtener mejores resultados cambiando step(11 fue óptimo en mi computadora portátil).
Alexandru
Tienes un error: int cnt[mask]debería serlo int cnt[mask + 1]. Para mejores resultados, use un valor fijo int cnt[1 << 16].
Alexandru
Probaré todas estas soluciones más tarde hoy cuando llegue a casa.
static_rtti
1.534s !!! Creo que tenemos un líder :-D
static_rtti
@static_rtti: ¿Podrías intentar esto de nuevo? Se ha vuelto significativamente más rápido que la última vez que lo probaste. En mi máquina, es sustancialmente más rápido que cualquier otra solución.
Damasco Steel
2

Supongo que esto realmente depende de lo que quieras hacer. Si quieres ordenar un grupo de gaussianos, entonces esto no te ayudará. Pero si quieres un montón de gaussianos ordenados, esto lo hará. Incluso si esto pierde un poco el problema, creo que será interesante comparar las rutinas de clasificación con las reales.

Si quieres que algo sea rápido, haz menos.

En lugar de generar un montón de muestras aleatorias de la distribución normal y luego ordenarlas, puede generar un montón de muestras de la distribución normal en orden ordenado.

Puede usar la solución aquí para generar n números aleatorios uniformes en orden ordenado. Luego puede usar el cdf inverso (scipy.stats.norm.ppf) de la distribución normal para convertir los números aleatorios uniformes en números de la distribución normal a través del muestreo de transformación inversa .

import scipy.stats
import random

# slightly modified from linked stackoverflow post
def n_random_numbers_increasing(n):
  """Like sorted(random() for i in range(n))),                                
  but faster because we avoid sorting."""
  v = 1.0
  while n:
    v *= random.random() ** (1.0 / n)
    yield 1 - v
    n -= 1

def n_normal_samples_increasing(n):
  return map(scipy.stats.norm.ppf, n_random_numbers_increasing(n))

Si quiere ensuciarse las manos, supongo que podría acelerar los numerosos cálculos de cdf inversos utilizando algún tipo de método iterativo y utilizando el resultado anterior como su suposición inicial. Dado que las conjeturas van a ser muy cercanas, probablemente una sola iteración le dará una gran precisión.

rrenaud
fuente
2
Buena respuesta, pero eso sería hacer trampa :) La idea de mi pregunta es que si bien los algoritmos de clasificación han recibido una atención enorme, casi no hay literatura sobre el uso de conocimientos previos sobre los datos para la clasificación, a pesar de los pocos documentos que tienen abordado el tema han reportado buenas ganancias. ¡Entonces veamos qué es posible!
2

Intente esto cambiando la solución de Guvante con este Main (), comienza a ordenar tan pronto como se realiza la lectura de 1/4 IO, es más rápido en mi prueba:

    static void Main(string[] args)
    {
        FileStream filestream = new FileStream(@"..\..\..\gaussian.dat", FileMode.Open, FileAccess.Read);
        doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
        Thread[] threads = new Thread[4];

        for (int i = 0; i < 4; i++)
        {
            byte[] bytes = new byte[count * 4];
            filestream.Read(bytes, 0, count * 4);

            for (int j = 0; j < count / 4; j++)
            {
                doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
            }

            threads[i] = new Thread(ThreadStart);
            waiting[i] = events[i] = new AutoResetEvent(false);
            threads[i].Start(i);    
        }

        WaitHandle.WaitAll(waiting);
        double[] left = Merge(doubles[0], doubles[1]);
        double[] right = Merge(doubles[2], doubles[3]);
        double[] result = Merge(left, right);
        Console.ReadKey();
    }
}

fuente
8.933s. Ligeramente más rápido :)
2

Dado que conoce la distribución, mi idea sería hacer k cubos, cada uno con el mismo número esperado de elementos (dado que conoce la distribución, puede calcular esto). Luego, en el tiempo O (n), barra la matriz y coloque los elementos en sus cubos.

Luego, simultáneamente, clasifique los cubos. Supongamos que tiene k cubos y n elementos. Un cubo tomará (n / k) lg (n / k) tiempo para ordenar. Ahora suponga que tiene procesadores p que puede usar. Dado que los cubos se pueden clasificar de forma independiente, tiene un multiplicador de techo (k / p) con el que lidiar. Esto proporciona un tiempo de ejecución final de n + ceil (k / p) * (n / k) lg (n / k), que debería ser mucho más rápido que n lg n si elige k bien.

mdkess
fuente
Creo que esta es la mejor solución.
Neil G
No sabes exactamente la cantidad de elementos que terminarán en un cubo, por lo que las matemáticas son realmente incorrectas. Dicho esto, creo que esta es una buena respuesta.
poulejapon
@pouejapon: Tienes razón.
Neil G
Esta respuesta suena muy bien. El problema es que no es realmente rápido. Implementé esto en C99 (vea mi respuesta), y ciertamente supera fácilmente std::sort(), pero es mucho más lento que la solución radixsort de Alexandru.
Sven Marnach
2

Una idea de optimización de bajo nivel es colocar dos dobles en un registro SSE, por lo que cada subproceso funcionaría con dos elementos a la vez. Esto puede ser complicado de hacer para algunos algoritmos.

Otra cosa que hacer es ordenar la matriz en trozos compatibles con la caché y luego fusionar los resultados. Deben usarse dos niveles: por ejemplo, primero 4 KB para L1 y luego 64 KB para L2.

Esto debería ser muy amigable con el caché, ya que la clasificación del depósito no saldrá del caché, y la fusión final recorrerá la memoria secuencialmente.

En estos días, la computación es mucho más barata que los accesos a la memoria. Sin embargo, tenemos una gran cantidad de elementos, por lo que es difícil saber cuál es el tamaño de la matriz cuando la clasificación tonta con reconocimiento de caché es más lenta que una versión de baja complejidad sin reconocimiento de caché.

Pero no proporcionaré una implementación de lo anterior ya que lo haría en Windows (VC ++).


fuente
2

Aquí hay una implementación de clasificación de cubeta de exploración lineal. Creo que es más rápido que todas las implementaciones actuales de un solo subproceso, excepto para la clasificación de radix. Debería tener un tiempo de ejecución lineal esperado si estoy estimando el cdf con suficiente precisión (estoy usando la interpolación lineal de valores que encontré en la web) y no he cometido ningún error que pueda causar un escaneo excesivo:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>

using std::fill;

const double q[] = {
  0.0,
  9.865E-10,
  2.8665150000000003E-7,
  3.167E-5,
  0.001349898,
  0.022750132,
  0.158655254,
  0.5,
  0.8413447460000001,
  0.9772498679999999,
  0.998650102,
  0.99996833,
  0.9999997133485,
  0.9999999990134999,
  1.0,
};
int main(int argc, char** argv) {
  if (argc <= 1)
    return puts("No argument!");
  unsigned count = atoi(argv[1]);
  unsigned count2 = 3 * count;

  bool *ba = new bool[count2 + 1000];
  fill(ba, ba + count2 + 1000, false);
  double *a = new double[count];
  double *c = new double[count2 + 1000];

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(a, 8, count, f) != count)
    return puts("fread failed!");
  fclose(f);

  int i;
  int j;
  bool s;
  int t;
  double z;
  double p;
  double d1;
  double d2;
  for (i = 0; i < count; i++) {
    s = a[i] < 0;
    t = a[i];
    if (s) t--;
    z = a[i] - t;
    t += 7;
    if (t < 0) {
      t = 0;
      z = 0;
    } else if (t >= 14) {
      t = 13;
      z = 1;
    }
    p = q[t] * (1 - z) + q[t + 1] * z;
    j = count2 * p;
    while (ba[j] && c[j] < a[i]) {
      j++;
    }
    if (!ba[j]) {
      ba[j] = true;
      c[j] = a[i];
    } else {
      d1 = c[j];
      c[j] = a[i];
      j++;
      while (ba[j]) {
        d2 = c[j];
        c[j] = d1;
        d1 = d2;
        j++;
      }
      c[j] = d1;
      ba[j] = true;
    }
  }
  i = 0;
  int max = count2 + 1000;
  for (j = 0; j < max; j++) {
    if (ba[j]) {
      a[i++] = c[j];
    }
  }
  // for (i = 0; i < count; i += 1) {
  //   printf("here %f\n", a[i]);
  // }
  return 0;
}
jonderry
fuente
1
Intentaré esto más tarde hoy cuando llegue a casa. Mientras tanto, ¿puedo decir que tu código es muy feo? :-D
static_rtti
3.071s! ¡No está mal para una solución de un solo subproceso!
static_rtti
2

No sé por qué no puedo editar mi publicación anterior, así que aquí está la nueva versión, 0,2 segundos más rápido (pero aproximadamente 1,5 s más rápido en tiempo de CPU (usuario)). Esta solución tiene 2 programas, primero calcula previamente los cuantiles para la distribución normal para la clasificación de cubetas, y los almacena en la tabla, t [doble * escala] = índice de cubetas, donde la escala es un número arbitrario que hace posible la conversión al doble. Entonces el programa principal puede usar estos datos para colocar los dobles en el cubo correcto. Tiene un inconveniente, si los datos no son gaussianos, no funcionará correctamente (y también hay casi cero posibilidades de trabajar incorrectamente para la distribución normal), pero la modificación para un caso especial es fácil y rápida (solo el número de comprobaciones de cubos y caer a estándar) ::ordenar()).

Compilación: g ++ => http://pastebin.com/WG7pZEzH programa de ayuda

g ++ -std = c ++ 0x -O3 -march = native -pthread => http://pastebin.com/T3yzViZP programa de clasificación principal

Zjarek
fuente
1.621s! Creo que eres el líder, pero estoy perdiendo rápidamente la pista con todas estas respuestas :)
static_rtti
2

Aquí hay otra solución secuencial. Éste utiliza el hecho de que los elementos están distribuidos normalmente, y creo que la idea es generalmente aplicable para obtener una clasificación cercana al tiempo lineal.

El algoritmo es así:

  • CDF aproximado (ver phi()función en la implementación)
  • Para todos los elementos, calcule la posición aproximada en la matriz ordenada: size * phi(x)
  • Coloque elementos en una nueva matriz cerca de su posición final
    • En mi implementación, la matriz de destino tiene algunos vacíos, por lo que no tengo que cambiar demasiados elementos al insertar.
  • Utilice la inserción para ordenar los elementos finales (la inserción es lineal si la distancia a la posición final es menor que una constante).

Desafortunadamente, la constante oculta es bastante grande y esta solución es dos veces más lenta que el algoritmo de clasificación de radix.

Alexandru
fuente
1
2.470s! Muy buenas ideas. No importa que la solución no sea la más rápida si las ideas son interesantes :)
static_rtti
1
Esto es lo mismo que el mío, pero agrupando los cálculos de phi y los cambios juntos para un mejor rendimiento de caché, ¿verdad?
jonderry
@ jonderry: Elegí tu solución, ahora que entiendo lo que hace. No quise robar tu idea. Incluí su implementación en mi conjunto (no oficial) de pruebas
Alexandru
2

Mi favorito personal usando los bloques de construcción roscados de Intel ya se ha publicado, pero aquí hay una solución paralela cruda usando JDK 7 y su nueva API fork / join:

import java.io.FileInputStream;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.*;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
import static java.nio.ByteOrder.LITTLE_ENDIAN;


/**
 * 
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

    public static void main(String[] args) throws Exception {

        double[] array = new double[Integer.valueOf(args[0])];

        FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
        fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer().get(array);

        ForkJoinPool mainPool = new ForkJoinPool();

        System.out.println("Starting parallel computation");

        mainPool.invoke(new ForkJoinQuicksortTask(array));        
    }

    private static final long serialVersionUID = -642903763239072866L;
    private static final int SERIAL_THRESHOLD = 0x1000;

    private final double a[];
    private final int left, right;

    public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

    private ForkJoinQuicksortTask(double[] a, int left, int right) {
        this.a = a;
        this.left = left;
        this.right = right;
    }

    @Override
    protected void compute() {
        if (right - left < SERIAL_THRESHOLD) {
            Arrays.sort(a, left, right + 1);
        } else {
            int pivotIndex = partition(a, left, right);
            ForkJoinTask<Void> t1 = null;

            if (left < pivotIndex)
                t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
            if (pivotIndex + 1 < right)
                new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

            if (t1 != null)
                t1.join();
        }
    }

    public static int partition(double[] a, int left, int right) {
        // chose middle value of range for our pivot
        double pivotValue = a[left + (right - left) / 2];

        --left;
        ++right;

        while (true) {
            do
                ++left;
            while (a[left] < pivotValue);

            do
                --right;
            while (a[right] > pivotValue);

            if (left < right) {
                double tmp = a[left];
                a[left] = a[right];
                a[right] = tmp;
            } else {
                return right;
            }
        }
    }    
}

Descargo de responsabilidad importante : tomé la adaptación de clasificación rápida para fork / join de: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel

Para ejecutar esto, necesita una versión beta de JDK 7 (http://jdk7.java.net/download.html).

En mi 2.93Ghz Quad core i7 (OS X):

Referencia de Python

time python sort.py 50000000
sorting...

real    1m13.885s
user    1m11.942s
sys     0m1.935s

Java JDK 7 fork / join

time java ForkJoinQuicksortTask 50000000
Starting parallel computation

real    0m2.404s
user    0m10.195s
sys     0m0.347s

También intenté experimentar un poco con la lectura paralela y convertir los bytes a dobles, pero no vi ninguna diferencia allí.

Actualizar:

Si alguien quiere experimentar con la carga paralela de los datos, la versión de carga paralela está a continuación. En teoría, esto podría hacerlo ir un poco más rápido aún, si su dispositivo IO tiene suficiente capacidad paralela (los SSD generalmente lo hacen). También hay algo de sobrecarga en la creación de Dobles a partir de bytes, por lo que también podría ir más rápido en paralelo. En mis sistemas (Ubuntu 10.10 / Nehalem Quad / Intel X25M SSD y OS X 10.6 / i7 Quad / Samsung SSD) no vi ninguna diferencia real.

import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;

import java.io.FileInputStream;
import java.nio.DoubleBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;


/**
 *
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

   public static void main(String[] args) throws Exception {

       ForkJoinPool mainPool = new ForkJoinPool();

       double[] array = new double[Integer.valueOf(args[0])];
       FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
       DoubleBuffer buffer = fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer();

       mainPool.invoke(new ReadAction(buffer, array, 0, array.length));
       mainPool.invoke(new ForkJoinQuicksortTask(array));
   }

   private static final long serialVersionUID = -642903763239072866L;
   private static final int SERIAL_THRESHOLD = 0x1000;

   private final double a[];
   private final int left, right;

   public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

   private ForkJoinQuicksortTask(double[] a, int left, int right) {
       this.a = a;
       this.left = left;
       this.right = right;
   }

   @Override
   protected void compute() {
       if (right - left < SERIAL_THRESHOLD) {
           Arrays.sort(a, left, right + 1);
       } else {
           int pivotIndex = partition(a, left, right);
           ForkJoinTask<Void> t1 = null;

           if (left < pivotIndex)
               t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
           if (pivotIndex + 1 < right)
               new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

           if (t1 != null)
               t1.join();
       }
   }

   public static int partition(double[] a, int left, int right) {
       // chose middle value of range for our pivot
       double pivotValue = a[left + (right - left) / 2];

       --left;
       ++right;

       while (true) {
           do
               ++left;
           while (a[left] < pivotValue);

           do
               --right;
           while (a[right] > pivotValue);

           if (left < right) {
               double tmp = a[left];
               a[left] = a[right];
               a[right] = tmp;
           } else {
               return right;
           }
       }
   }

}

class ReadAction extends RecursiveAction {

   private static final long serialVersionUID = -3498527500076085483L;

   private final DoubleBuffer buffer;
   private final double[] array;
   private final int low, high;

   public ReadAction(DoubleBuffer buffer, double[] array, int low, int high) {
       this.buffer = buffer;
       this.array = array;
       this.low = low;
       this.high = high;
   }

   @Override
   protected void compute() {
       if (high - low < 100000) {
           buffer.position(low);
           buffer.get(array, low, high-low);
       } else {
           int middle = (low + high) >>> 1;

           invokeAll(new ReadAction(buffer.slice(), array, low, middle),  new ReadAction(buffer.slice(), array, middle, high));
       }
   }
}

Actualización2:

Ejecuté el código en una de nuestras máquinas de desarrollo de 12 núcleos con una ligera modificación para establecer una cantidad fija de núcleos. Esto dio los siguientes resultados:

Cores  Time
1      7.568s
2      3.903s
3      3.325s
4      2.388s
5      2.227s
6      1.956s
7      1.856s
8      1.827s
9      1.682s
10     1.698s
11     1.620s
12     1.503s

En este sistema también probé la versión de Python que tomó 1m2.994s y la versión C ++ de Zjarek que tomó 1.925s (por alguna razón, la versión C ++ de Zjarek parece correr relativamente más rápido en la computadora static_rtti).

También probé lo que sucedió si duplicaba el tamaño del archivo a 100,000,000 de dobles:

Cores  Time
1      15.056s
2      8.116s
3      5.925s
4      4.802s
5      4.430s
6      3.733s
7      3.540s
8      3.228s
9      3.103s
10     2.827s
11     2.784s
12     2.689s

En este caso, la versión C ++ de Zjarek tomó 3.968s. Python solo tardó demasiado aquí.

150,000,000 de dobles:

Cores  Time
1      23.295s
2      12.391s
3      8.944s
4      6.990s
5      6.216s
6      6.211s
7      5.446s
8      5.155s
9      4.840s
10     4.435s
11     4.248s
12     4.174s

En este caso, la versión C ++ de Zjarek fue 6.044s. Ni siquiera intenté Python.

La versión C ++ es muy consistente con sus resultados, donde Java se balancea un poco. Primero se vuelve un poco más eficiente cuando el problema se agrava, pero luego vuelve a ser menos eficiente.

Arjan
fuente
1
Este código no analiza los valores dobles correctamente para mí. ¿Se requiere Java 7 para analizar correctamente los valores del archivo?
jonderry
1
Ah, tonto de mí. Olvidé establecer el endianness nuevamente después de refactorizar localmente el código IO de varias líneas a una. Java 7 normalmente sería necesario, a menos que agregue fork / join por separado a Java 6, por supuesto.
arjan
3.411s en mi máquina. No está mal, pero es más lento que la solución java de koumes21 :)
static_rtti
1
Probaré la solución de koumes21 aquí también localmente para ver cuáles son las diferencias relativas en mi sistema. De todos modos, no hay vergüenza en 'perder' de koumes21 ya que es una solución mucho más inteligente. Esto es solo una clasificación rápida casi estándar lanzada en un grupo fork / join;)
arjan
1

Una versión con pthreads tradicionales. Código de fusión copiado de la respuesta de Guvante. Compilar con g++ -O3 -pthread.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <algorithm>

static unsigned int nthreads = 4;
static unsigned int size = 50000000;

typedef struct {
  double *array;
  int size;
} array_t;


void 
merge(double *left, int leftsize,
      double *right, int rightsize,
      double *result)
{
  int l = 0, r = 0, insertat = 0;
  while (l < leftsize && r < rightsize) {
    if (left[l] < right[r])
      result[insertat++] = left[l++];
    else
      result[insertat++] = right[r++];
  }

  while (l < leftsize) result[insertat++] = left[l++];
  while (r < rightsize) result[insertat++] = right[r++];
}


void *
run_thread(void *input)
{
  array_t numbers = *(array_t *)input;
  std::sort(numbers.array, numbers.array+numbers.size); 
  pthread_exit(NULL);
}

int 
main(int argc, char **argv) 
{
  double *numbers = (double *) malloc(size * sizeof(double));

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(numbers, sizeof(double), size, f) != size)
    return printf("Reading gaussian.dat failed");
  fclose(f);

  array_t worksets[nthreads];
  int worksetsize = size / nthreads;
  for (int i = 0; i < nthreads; i++) {
    worksets[i].array=numbers+(i*worksetsize);
    worksets[i].size=worksetsize;
  }

  pthread_attr_t attributes;
  pthread_attr_init(&attributes);
  pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE);

  pthread_t threads[nthreads];
  for (int i = 0; i < nthreads; i++) {
    pthread_create(&threads[i], &attributes, &run_thread, &worksets[i]);
  }

  for (int i = 0; i < nthreads; i++) {
    pthread_join(threads[i], NULL);
  }

  double *tmp = (double *) malloc(size * sizeof(double));
  merge(numbers, worksetsize, numbers+worksetsize, worksetsize, tmp);
  merge(numbers+(worksetsize*2), worksetsize, numbers+(worksetsize*3), worksetsize, tmp+(size/2));
  merge(tmp, worksetsize*2, tmp+(size/2), worksetsize*2, numbers);

  /*
  printf("Verifying result..\n");
  for (int i = 0; i < size - 1; i++) {
    if (numbers[i] > numbers[i+1])
      printf("Result is not correct\n");
  }
  */

  pthread_attr_destroy(&attributes);
  return 0;
}  

En mi computadora portátil obtengo los siguientes resultados:

real    0m6.660s
user    0m9.449s
sys     0m1.160s
Vendedor
fuente
1

Aquí hay una implementación secuencial de C99 que intenta realmente hacer uso de la distribución conocida. Básicamente, realiza una sola ronda de clasificación de cubetas utilizando la información de distribución, luego unas pocas rondas de clasificación rápida en cada cubeta suponiendo una distribución uniforme dentro de los límites de la cubeta y, finalmente, una clasificación de selección modificada para copiar los datos de nuevo al búfer original. La clasificación rápida memoriza los puntos divididos, por lo que la clasificación por selección solo necesita operar en pequeños pedazos. Y a pesar (¿porque?) De toda esa complejidad, ni siquiera es realmente rápido.

Para hacer que la evaluación sea rápida, los valores se muestrean en unos pocos puntos y luego solo se utiliza la interpolación lineal. En realidad, no importa si Φ se evalúa exactamente, siempre y cuando la aproximación sea estrictamente monotónica.

Los tamaños de los contenedores se eligen de manera que la posibilidad de un desbordamiento del contenedor sea insignificante. Más precisamente, con los parámetros actuales, la posibilidad de que un conjunto de datos de 50000000 elementos cause un desbordamiento de bin es 3.65e-09. (Esto se puede calcular utilizando la función de supervivencia de la distribución de Poisson ).

Para compilar, utilice

gcc -std=c99 -msse3 -O3 -ffinite-math-only

Como hay mucho más cálculo que en las otras soluciones, estos indicadores de compilación son necesarios para que sea al menos razonablemente rápido. Sin -msse3las conversiones de doublea intconvertirse muy lento. Si su arquitectura no es compatible con SSE3, estas conversiones también se pueden hacer usando la lrint()función.

El código es bastante feo, no estoy seguro si cumple con el requisito de ser "razonablemente legible" ...

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

#define N 50000000
#define BINSIZE 720
#define MAXBINSIZE 880
#define BINCOUNT (N / BINSIZE)
#define SPLITS 64
#define PHI_VALS 513

double phi_vals[PHI_VALS];

int bin_index(double x)
{
    double y = (x + 8.0) * ((PHI_VALS - 1) / 16.0);
    int interval = y;
    y -= interval;
    return (1.0 - y) * phi_vals[interval] + y * phi_vals[interval + 1];
}

double bin_value(int bin)
{
    int left = 0;
    int right = PHI_VALS - 1;
    do
    {
        int centre = (left + right) / 2;
        if (bin < phi_vals[centre])
            right = centre;
        else
            left = centre;
    } while (right - left > 1);
    double frac = (bin - phi_vals[left]) / (phi_vals[right] - phi_vals[left]);
    return (left + frac) * (16.0 / (PHI_VALS - 1)) - 8.0;
}

void gaussian_sort(double *restrict a)
{
    double *b = malloc(BINCOUNT * MAXBINSIZE * sizeof(double));
    double **pos = malloc(BINCOUNT * sizeof(double*));
    for (size_t i = 0; i < BINCOUNT; ++i)
        pos[i] = b + MAXBINSIZE * i;
    for (size_t i = 0; i < N; ++i)
        *pos[bin_index(a[i])]++ = a[i];
    double left_val, right_val = bin_value(0);
    for (size_t bin = 0, i = 0; bin < BINCOUNT; ++bin)
    {
        left_val = right_val;
        right_val = bin_value(bin + 1);
        double *splits[SPLITS + 1];
        splits[0] = b + bin * MAXBINSIZE;
        splits[SPLITS] = pos[bin];
        for (int step = SPLITS; step > 1; step >>= 1)
            for (int left_split = 0; left_split < SPLITS; left_split += step)
            {
                double *left = splits[left_split];
                double *right = splits[left_split + step] - 1;
                double frac = (double)(left_split + (step >> 1)) / SPLITS;
                double pivot = (1.0 - frac) * left_val + frac * right_val;
                while (1)
                {
                    while (*left < pivot && left <= right)
                        ++left;
                    while (*right >= pivot && left < right)
                        --right;
                    if (left >= right)
                        break;
                    double tmp = *left;
                    *left = *right;
                    *right = tmp;
                    ++left;
                    --right;
                }
                splits[left_split + (step >> 1)] = left;
            }
        for (int left_split = 0; left_split < SPLITS; ++left_split)
        {
            double *left = splits[left_split];
            double *right = splits[left_split + 1] - 1;
            while (left <= right)
            {
                double *min = left;
                for (double *tmp = left + 1; tmp <= right; ++tmp)
                    if (*tmp < *min)
                        min = tmp;
                a[i++] = *min;
                *min = *right--;
            }
        }
    }
    free(b);
    free(pos);
}

int main()
{
    double *a = malloc(N * sizeof(double));
    FILE *f = fopen("gaussian.dat", "rb");
    assert(fread(a, sizeof(double), N, f) == N);
    fclose(f);
    for (int i = 0; i < PHI_VALS; ++i)
    {
        double x = (i * (16.0 / PHI_VALS) - 8.0) / sqrt(2.0);
        phi_vals[i] =  (erf(x) + 1.0) * 0.5 * BINCOUNT;
    }
    gaussian_sort(a);
    free(a);
}
Sven Marnach
fuente
4.098s! Tuve que agregar -lm para compilarlo (para erf).
static_rtti
1
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <memory.h>
#include <algorithm>

// maps [-inf,+inf] to (0,1)
double normcdf(double x) {
        return 0.5 * (1 + erf(x * M_SQRT1_2));
}

int calcbin(double x, int bins) {
        return (int)floor(normcdf(x) * bins);
}

int *docensus(int bins, int n, double *arr) {
        int *hist = calloc(bins, sizeof(int));
        int i;
        for(i = 0; i < n; i++) {
                hist[calcbin(arr[i], bins)]++;
        }
        return hist;
}

void partition(int bins, int *orig_counts, double *arr) {
        int *counts = malloc(bins * sizeof(int));
        memcpy(counts, orig_counts, bins*sizeof(int));
        int *starts = malloc(bins * sizeof(int));
        int b, i;
        starts[0] = 0;
        for(i = 1; i < bins; i++) {
                starts[i] = starts[i-1] + counts[i-1];
        }
        for(b = 0; b < bins; b++) {
                while (counts[b] > 0) {
                        double v = arr[starts[b]];
                        int correctbin;
                        do {
                                correctbin = calcbin(v, bins);
                                int swappos = starts[correctbin];
                                double tmp = arr[swappos];
                                arr[swappos] = v;
                                v = tmp;
                                starts[correctbin]++;
                                counts[correctbin]--;
                        } while (correctbin != b);
                }
        }
        free(counts);
        free(starts);
}


void sortbins(int bins, int *counts, double *arr) {
        int start = 0;
        int b;
        for(b = 0; b < bins; b++) {
                std::sort(arr + start, arr + start + counts[b]);
                start += counts[b];
        }
}


void checksorted(double *arr, int n) {
        int i;
        for(i = 1; i < n; i++) {
                if (arr[i-1] > arr[i]) {
                        printf("out of order at %d: %lf %lf\n", i, arr[i-1], arr[i]);
                        exit(1);
                }
        }
}


int main(int argc, char *argv[]) {
        if (argc == 1 || argv[1] == NULL) {
                printf("Expected data size as argument\n");
                exit(1);
        }
        int n = atoi(argv[1]);
        const int cachesize = 128 * 1024; // a guess
        int bins = (int) (1.1 * n * sizeof(double) / cachesize);
        if (argc > 2) {
                bins = atoi(argv[2]);
        }
        printf("Using %d bins\n", bins);
        FILE *f = fopen("gaussian.dat", "rb");
        if (f == NULL) {
                printf("Couldn't open gaussian.dat\n");
                exit(1);
        }
        double *arr = malloc(n * sizeof(double));
        fread(arr, sizeof(double), n, f);
        fclose(f);

        int *counts = docensus(bins, n, arr);
        partition(bins, counts, arr);
        sortbins(bins, counts, arr);
        checksorted(arr, n);

        return 0;
}

Esto usa erf () para colocar cada elemento apropiadamente en un bin, luego ordena cada bin. Mantiene la matriz completamente en su lugar.

Primer paso: docensus () cuenta el número de elementos en cada contenedor.

Segunda pasada: la partición () permuta la matriz, colocando cada elemento en su contenedor apropiado

Tercer paso: sortbins () realiza un qsort en cada bin.

Es un poco ingenuo, y llama a la costosa función erf () dos veces por cada valor. El primer y tercer pases son potencialmente paralelizables. El segundo es altamente serial y probablemente se ralentiza por sus patrones de acceso a memoria altamente aleatorios. También podría valer la pena almacenar en caché el número de bin de cada doble, dependiendo de la relación de potencia de la CPU a la velocidad de la memoria.

Este programa le permite elegir la cantidad de contenedores que usará. Simplemente agregue un segundo número a la línea de comando. Lo compilé con gcc -O3, pero mi máquina es tan débil que no puedo decirle ningún buen número de rendimiento.

Editar: Poof! Mi programa C se ha transformado mágicamente en un programa C ++ usando std :: sort!

frud
fuente
Puede usar phi para un stdnormal_cdf más rápido.
Alexandru
¿Cuántos contenedores debo poner, aproximadamente?
static_rtti
@Alexandru: agregué una aproximación lineal por partes a normcdf y solo obtuve alrededor del 5% de velocidad.
frud
@static_rtti: No tienes que poner ninguno. De forma predeterminada, el código elige el recuento de ubicaciones, por lo que el tamaño promedio de la ubicación es 10/11 de 128 kb. Muy pocos contenedores y no obtienes el beneficio de particionar. Demasiados y la fase de partición se atasca debido al desbordamiento de la memoria caché.
frud
10.6s! Intenté jugar un poco con el número de bins, y obtuve los mejores resultados con 5000 (un poco por encima del valor predeterminado de 3356). Debo decir que esperaba ver un rendimiento mucho mejor para su solución ... ¿Tal vez es el hecho de que está usando qsort en lugar de std :: tipo de soluciones C ++ potencialmente más rápidas?
static_rtti
1

Eche un vistazo a la implementación de clasificación de radix por Michael Herf ( Radix Tricks ). En mi máquina, la clasificación fue 5 veces más rápida en comparación con el std::sortalgoritmo de mi primera respuesta. El nombre de la función de clasificación es RadixSort11.

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<float> v;
    v.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        v.push_back(static_cast<float>(d));
    std::vector<float> vres(v.size(), 0.0);
    clock_t c0 = clock();
    RadixSort11(&v[0], &vres[0], v.size());
    std::cout << "Finished after: "
              << static_cast<double>(clock() - c0) / CLOCKS_PER_SEC << std::endl;
    return 0;
}
Christian Ammer
fuente