📜  自然语言处理 |如何使用 Execnet 和 Redis 对单词进行评分

📅  最后修改于: 2022-05-13 01:55:45.760000             🧑  作者: Mango

自然语言处理 |如何使用 Execnet 和 Redis 对单词进行评分

可以同时使用 Redis 和 Execnet 执行分布式单词评分。对于movie_reviews语料库中的每个单词,使用FreqDist和ConditionalFreqDist计算信息增益。
使用 >RedisHashFreqDist 和RedisConditionalHashFreqDist ,可以使用 Redis 执行相同的操作。然后将分数存储在RedisOrderedDict中。为了从 Redis 中获得更好的性能,使用 Execnet 来分配计数。
Redis 和 execnet 用于一起做分布式单词评分。使用 FreqDist 和 ConditionalFreqDist 计算每个单词的信息增益。现在使用 Redis,可以使用RedisHashFreqDistRedisConditionalHashFreqDist执行相同的操作,然后将分数存储在RedisOrderedDict中。 Execnet 可用于分配计数,以便从 Redis 中获得更好的性能。安装 Redis 和 execnet 后, redis-server的实例必须在 localhost 上运行。
脚步:

  • 对于 movie_reviews 语料库中的每个标签(只有 pos 和 neg 标签),首先获取一个元组列表——标签和单词。
  • 然后从 dist_featx 模块中,使用 score_words() 获取 word_scores。
  • 单词总数为 39、764,word_scores函数是RedisOrderedDict的一个实例。
  • 然后获取前 1, 000 个单词并使用 keys() 方法检查前 5 个单词以查看它们是什么。
  • 从 word_scores 获得所有需要的信息后删除 Redis 中的键,因为不再需要数据。

代码 :

Python3
# importing libraries
from dist_featx import score_words
from nltk.corpus import movie_reviews
 
# finding category via categoreies
category = movie_reviews.categories()
 
print ("Categories : ", category)
category_words = [
        (l, movie_reviews.words(categories = [l]))
        for l in category]
 
# Scores
word_scores = score_words(category_words)
print ("Length : ", len(word_scores))
 
# top words
topn_words = word_scores.keys(end = 1000)
print ("Top Words : ", topn_words[0:5])
 
# Delete the keys in Redis after getting
# all the required from word_scores
from redis import Redis
r = Redis()
print ([r.delete(key) for
     key in ['word_fd', 'label_word_fd:neg',
             'label_word_fd:pos', 'word_scores']] )


Python3
# Importing library
import itertools, execnet, remote_word_count
from nltk.metrics import BigramAssocMeasures
from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist
from rediscollections import RedisOrderedDict
 
# Scoring the words
def score_words(category_words,
                score_fn = BigramAssocMeasures.chi_sq,
                host ='localhost', specs =[('popen', 2)]):
    gateways = []
    channels = []
     
    # counting
    for spec, count in specs:
        for i in range(count):
            gw = execnet.makegateway(spec)
            gateways.append(gw)
            channel = gw.remote_exec(remote_word_count)
            channel.send((host, 'word_fd', 'category_word_fd'))
            channels.append(channel)
             
    cyc = itertools.cycle(channels)
     
    # syncing the channel
    for category, words in category_words:
        channel = next(cyc)
        channel.send((category, list(words)))
         
    for channel in channels:
        channel.send('done')
        assert 'done' == channel.receive()
        channel.waitclose(5)
         
    for gateway in gateways:
        gateway.exit()
         
    r = Redis(host)
    # frequency distribution
    fd = RedisHashFreqDist(r, 'word_fd')
    cfd = RedisConditionalHashFreqDist(r, 'category_word_fd')
    word_scores = RedisOrderedDict(r, 'word_scores')
    n_xx = cfd.N()
     
    for category in cfd.conditions():
        n_xi = cfd[category].N()
     
    for word, n_ii in cfd[category].iteritems():
        word = word.decode()
        n_ix = fd[word]
         
        if n_ii and n_ix and n_xi and n_xx:
            score = score_fn(n_ii, (n_ix, n_xi), n_xx)
            word_scores[word] = score
    # final word scores       
    return word_scores


输出 :

Categories :  ['neg', 'pos']
Length : 39767
Top Words : [b'bad', b', ', b'and', b'?', b'movie']
[1, 1, 1, 1]

score_words()是 dist_featx 的一个函数。但预计会等待一段时间,因为它需要一些时间才能完成。使用 execnet 和 Redis 的开销意味着它比函数的非分布式内存版本花费的时间要长得多。
这个怎么运作?
dist_featx.py模块包含 score_words()函数,它执行以下操作:

  • 打开网关和通道。
  • 将初始化数据发送到每个通道。
  • 为了计数,它通过一个通道发送每个(标签,单词)元组。
  • 向每个通道发送完成消息。
  • 等待完成回复。
  • 关闭通道和网关。
  • 根据计数计算每个单词的分数。
  • 将分数存储在 RedisOrderedDict 中。

计数完成后,对所有单词进行评分并存储结果。代码如下:
代码 :

Python3

# Importing library
import itertools, execnet, remote_word_count
from nltk.metrics import BigramAssocMeasures
from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist
from rediscollections import RedisOrderedDict
 
# Scoring the words
def score_words(category_words,
                score_fn = BigramAssocMeasures.chi_sq,
                host ='localhost', specs =[('popen', 2)]):
    gateways = []
    channels = []
     
    # counting
    for spec, count in specs:
        for i in range(count):
            gw = execnet.makegateway(spec)
            gateways.append(gw)
            channel = gw.remote_exec(remote_word_count)
            channel.send((host, 'word_fd', 'category_word_fd'))
            channels.append(channel)
             
    cyc = itertools.cycle(channels)
     
    # syncing the channel
    for category, words in category_words:
        channel = next(cyc)
        channel.send((category, list(words)))
         
    for channel in channels:
        channel.send('done')
        assert 'done' == channel.receive()
        channel.waitclose(5)
         
    for gateway in gateways:
        gateway.exit()
         
    r = Redis(host)
    # frequency distribution
    fd = RedisHashFreqDist(r, 'word_fd')
    cfd = RedisConditionalHashFreqDist(r, 'category_word_fd')
    word_scores = RedisOrderedDict(r, 'word_scores')
    n_xx = cfd.N()
     
    for category in cfd.conditions():
        n_xi = cfd[category].N()
     
    for word, n_ii in cfd[category].iteritems():
        word = word.decode()
        n_ix = fd[word]
         
        if n_ii and n_ix and n_xi and n_xx:
            score = score_fn(n_ii, (n_ix, n_xi), n_xx)
            word_scores[word] = score
    # final word scores       
    return word_scores

如果有两个以上的标签,则应使用不同的评分方法。要比较两个标签,评分方法只会是准确的。这些要求将决定您如何存储单词分数。
拥有实例后,可以通过通道接收两种数据 -

  1. 完成消息:它表示没有更多数据通过通道进入。
    回复另一个 done 消息,最后退出循环以关闭通道。
  2. (label, words) 的 2 元组:它用于迭代以增加RedisHashFreqDistRedisConditionalHashFreqDist中的计数