具有多个线程持久和刷新的Doctrine Entity Manager

具有多个线程持久和刷新的Doctrine Entity Manager

问题描述:

class Job extends \Stackable
{
    public function __construct($monitor)
    {
        $this->monitor = $monitor;
    }
    public function run()
    {
        curl_setopt($this->ch, CURLOPT_URL, $this->monitor->getIp());
        $request = json_decode(curl_exec($this->ch), true);
        //some more db transactions
        $this->em->persist
        (
            (new Attempt())
                ->setMonitor($this->monitor)
                ->setTimestamp(new \DateTime())
                ->setLatency($request['latency'])
        );
        $this->em->flush();
    }
}

class ProbeWorker extends \Worker
{
    public function __construct($em)
    {
        $this->em = $em;
        $this->ch = curl_init();
    }
    public function run()
    {
        curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, true);
    }
}

class DogeCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this
            ->setName('wow:doge')
            ->setDescription('such speed')
        ;
    }
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $em = $this->getContainer()->get('doctrine')->getManager();
        $monitors = $em->getRepository('WowABundle:Monitor')->findAll();
        $worker = new ProbeWorker($em);
        $worker->start();
        foreach($monitors as $monitor)
        {
            $job = new ProbeJob($monitor);
            $worker->stack($job);
        }
    }
}

This gives me an

  [PDOException]                                     
  You cannot serialize or unserialize PDO instances 

error. What does that even mean? I profiled the performance of the old implementation where it would process it serially, and I spent half of my time on flushing. Splitting it into smaller chunks didn't help. Each iteration of the loop takes 1 second, so by parallelizing it, assuming the hardware was capable, I'd be able to significantly reduce the run time and have it scale to more websites. Is there a way to do this? How do I fix this?

I am sure that each job is truly independent from each other, and there wouldn't be any conflicting db transactions. Everything is an insert or read.

The worker should initialize curl in the run method, not the constructor.

The $ch resource is then available to stackables via $this->worker->ch during their execution.

The error you are experiencing is because you are writing $em to the object scope, it doesn't descend from pthreads and so is not thread safe, pthreads therefore attempts to serialize the object for safe storage, but the object is not serializable so you experience the error you have shown.

The workaround is going to be to avoid setting that member, initialize an instance of the object for each Worker, I would probably store in static scope to avoid serialization and the unecessary overhead of mutex when accessing these complex objects.